21020: Adjust spacing to follow PEP 8
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import hashlib
6 import os
7 import errno
8 import pycurl
9 import random
10 import re
11 import shutil
12 import socket
13 import sys
14 import stat
15 import tempfile
16 import time
17 import unittest
18 import urllib.parse
19 import mmap
20
21 from unittest import mock
22 from unittest.mock import patch
23
24 import parameterized
25
26 import arvados
27 import arvados.retry
28 import arvados.util
29 from . import arvados_testutil as tutil
30 from . import keepstub
31 from . import run_test_server
32
33 from .arvados_testutil import DiskCacheBase
34
35 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
36 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
37     disk_cache = False
38     MAIN_SERVER = {}
39     KEEP_SERVER = {}
40     block_cache_test = None
41
42     @classmethod
43     def setUpClass(cls):
44         super(KeepTestCase, cls).setUpClass()
45         run_test_server.authorize_with("admin")
46         cls.api_client = arvados.api('v1')
47         cls.block_cache_test = DiskCacheBase()
48         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
49                                              proxy='', local_store='',
50                                              block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
51
52     @classmethod
53     def tearDownClass(cls):
54         super(KeepTestCase, cls).setUpClass()
55         cls.block_cache_test.tearDown()
56
57     def test_KeepBasicRWTest(self):
58         self.assertEqual(0, self.keep_client.upload_counter.get())
59         foo_locator = self.keep_client.put('foo')
60         self.assertRegex(
61             foo_locator,
62             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
63             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
64
65         # 6 bytes because uploaded 2 copies
66         self.assertEqual(6, self.keep_client.upload_counter.get())
67
68         self.assertEqual(0, self.keep_client.download_counter.get())
69         self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
70                          b'foo'),
71                          'wrong content from Keep.get(md5("foo"))')
72         self.assertEqual(3, self.keep_client.download_counter.get())
73
74     def test_KeepBinaryRWTest(self):
75         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
76         blob_locator = self.keep_client.put(blob_str)
77         self.assertRegex(
78             blob_locator,
79             '^7fc7c53b45e53926ba52821140fef396\+6',
80             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
81         self.assertEqual(self.keep_client.get(blob_locator),
82                          blob_str,
83                          'wrong content from Keep.get(md5(<binarydata>))')
84
85     def test_KeepLongBinaryRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         for i in range(0, 23):
88             blob_data = blob_data + blob_data
89         blob_locator = self.keep_client.put(blob_data)
90         self.assertRegex(
91             blob_locator,
92             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
93             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
94         self.assertEqual(self.keep_client.get(blob_locator),
95                          blob_data,
96                          'wrong content from Keep.get(md5(<binarydata>))')
97
98     @unittest.skip("unreliable test - please fix and close #8752")
99     def test_KeepSingleCopyRWTest(self):
100         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
101         blob_locator = self.keep_client.put(blob_data, copies=1)
102         self.assertRegex(
103             blob_locator,
104             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
105             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
106         self.assertEqual(self.keep_client.get(blob_locator),
107                          blob_data,
108                          'wrong content from Keep.get(md5(<binarydata>))')
109
110     def test_KeepEmptyCollectionTest(self):
111         blob_locator = self.keep_client.put('', copies=1)
112         self.assertRegex(
113             blob_locator,
114             '^d41d8cd98f00b204e9800998ecf8427e\+0',
115             ('wrong locator from Keep.put(""): ' + blob_locator))
116
117     def test_unicode_must_be_ascii(self):
118         # If unicode type, must only consist of valid ASCII
119         foo_locator = self.keep_client.put(u'foo')
120         self.assertRegex(
121             foo_locator,
122             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
123             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
124
125         if sys.version_info < (3, 0):
126             with self.assertRaises(UnicodeEncodeError):
127                 # Error if it is not ASCII
128                 self.keep_client.put(u'\xe2')
129
130         with self.assertRaises(AttributeError):
131             # Must be bytes or have an encode() method
132             self.keep_client.put({})
133
134     def test_KeepHeadTest(self):
135         locator = self.keep_client.put('test_head')
136         self.assertRegex(
137             locator,
138             '^b9a772c7049325feb7130fff1f8333e9\+9',
139             'wrong md5 hash from Keep.put for "test_head": ' + locator)
140         self.assertEqual(True, self.keep_client.head(locator))
141         self.assertEqual(self.keep_client.get(locator),
142                          b'test_head',
143                          'wrong content from Keep.get for "test_head"')
144
145 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
146 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
147     disk_cache = False
148     MAIN_SERVER = {}
149     KEEP_SERVER = {'blob_signing': True}
150
151     def tearDown(self):
152         DiskCacheBase.tearDown(self)
153
154     def test_KeepBasicRWTest(self):
155         run_test_server.authorize_with('active')
156         keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
157         foo_locator = keep_client.put('foo')
158         self.assertRegex(
159             foo_locator,
160             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
161             'invalid locator from Keep.put("foo"): ' + foo_locator)
162         self.assertEqual(keep_client.get(foo_locator),
163                          b'foo',
164                          'wrong content from Keep.get(md5("foo"))')
165
166         # GET with an unsigned locator => bad request
167         bar_locator = keep_client.put('bar')
168         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
169         self.assertRegex(
170             bar_locator,
171             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
172             'invalid locator from Keep.put("bar"): ' + bar_locator)
173         self.assertRaises(arvados.errors.KeepReadError,
174                           keep_client.get,
175                           unsigned_bar_locator)
176
177         # GET from a different user => bad request
178         run_test_server.authorize_with('spectator')
179         self.assertRaises(arvados.errors.KeepReadError,
180                           arvados.Keep.get,
181                           bar_locator)
182
183         # Unauthenticated GET for a signed locator => bad request
184         # Unauthenticated GET for an unsigned locator => bad request
185         keep_client.api_token = ''
186         self.assertRaises(arvados.errors.KeepReadError,
187                           keep_client.get,
188                           bar_locator)
189         self.assertRaises(arvados.errors.KeepReadError,
190                           keep_client.get,
191                           unsigned_bar_locator)
192
193
194 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
195 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
196     disk_cache = False
197     MAIN_SERVER = {}
198     KEEP_SERVER = {}
199     KEEP_PROXY_SERVER = {}
200
201     @classmethod
202     def setUpClass(cls):
203         super(KeepProxyTestCase, cls).setUpClass()
204         run_test_server.authorize_with('active')
205         cls.api_client = arvados.api('v1')
206
207     def tearDown(self):
208         super(KeepProxyTestCase, self).tearDown()
209         DiskCacheBase.tearDown(self)
210
211     def test_KeepProxyTest1(self):
212         # Will use ARVADOS_KEEP_SERVICES environment variable that
213         # is set by setUpClass().
214         keep_client = arvados.KeepClient(api_client=self.api_client,
215                                          local_store='', block_cache=self.make_block_cache(self.disk_cache))
216         baz_locator = keep_client.put('baz')
217         self.assertRegex(
218             baz_locator,
219             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
220             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
221         self.assertEqual(keep_client.get(baz_locator),
222                          b'baz',
223                          'wrong content from Keep.get(md5("baz"))')
224         self.assertTrue(keep_client.using_proxy)
225
226     def test_KeepProxyTestMultipleURIs(self):
227         # Test using ARVADOS_KEEP_SERVICES env var overriding any
228         # existing proxy setting and setting multiple proxies
229         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
230         keep_client = arvados.KeepClient(api_client=self.api_client,
231                                          local_store='',
232                                          block_cache=self.make_block_cache(self.disk_cache))
233         uris = [x['_service_root'] for x in keep_client._keep_services]
234         self.assertEqual(uris, ['http://10.0.0.1/',
235                                 'https://foo.example.org:1234/'])
236
237     def test_KeepProxyTestInvalidURI(self):
238         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
239         with self.assertRaises(arvados.errors.ArgumentError):
240             keep_client = arvados.KeepClient(api_client=self.api_client,
241                                              local_store='',
242                                              block_cache=self.make_block_cache(self.disk_cache))
243
244
245 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
246 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
247     disk_cache = False
248
249     def tearDown(self):
250         DiskCacheBase.tearDown(self)
251
252     def get_service_roots(self, api_client):
253         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
254         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
255         return [urllib.parse.urlparse(url) for url in sorted(services)]
256
257     def test_ssl_flag_respected_in_roots(self):
258         for ssl_flag in [False, True]:
259             services = self.get_service_roots(self.mock_keep_services(
260                 service_ssl_flag=ssl_flag))
261             self.assertEqual(
262                 ('https' if ssl_flag else 'http'), services[0].scheme)
263
264     def test_correct_ports_with_ipv6_addresses(self):
265         service = self.get_service_roots(self.mock_keep_services(
266             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
267         self.assertEqual('100::1', service.hostname)
268         self.assertEqual(10, service.port)
269
270     def test_recognize_proxy_services_in_controller_response(self):
271         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
272             service_type='proxy', service_host='localhost', service_port=9, count=1),
273                                          block_cache=self.make_block_cache(self.disk_cache))
274         try:
275             # this will fail, but it ensures we get the service
276             # discovery response
277             keep_client.put('baz2', num_retries=0)
278         except:
279             pass
280         self.assertTrue(keep_client.using_proxy)
281
282     def test_insecure_disables_tls_verify(self):
283         api_client = self.mock_keep_services(count=1)
284         force_timeout = socket.timeout("timed out")
285
286         api_client.insecure = True
287         with tutil.mock_keep_responses(b'foo', 200) as mock:
288             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
289             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
290             self.assertEqual(
291                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
292                 0)
293             self.assertEqual(
294                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
295                 0)
296
297         api_client.insecure = False
298         with tutil.mock_keep_responses(b'foo', 200) as mock:
299             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
300             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
301             # getopt()==None here means we didn't change the
302             # default. If we were using real pycurl instead of a mock,
303             # it would return the default value 1.
304             self.assertEqual(
305                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
306                 None)
307             self.assertEqual(
308                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
309                 None)
310
311     def test_refresh_signature(self):
312         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
313         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
314         local_loc = blk_digest+'+A'+blk_sig
315         remote_loc = blk_digest+'+R'+blk_sig
316         api_client = self.mock_keep_services(count=1)
317         headers = {'X-Keep-Locator':local_loc}
318         with tutil.mock_keep_responses('', 200, **headers):
319             # Check that the translated locator gets returned
320             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
321             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
322             # Check that refresh_signature() uses the correct method and headers
323             keep_client._get_or_head = mock.MagicMock()
324             keep_client.refresh_signature(remote_loc)
325             args, kwargs = keep_client._get_or_head.call_args_list[0]
326             self.assertIn(remote_loc, args)
327             self.assertEqual("HEAD", kwargs['method'])
328             self.assertIn('X-Keep-Signature', kwargs['headers'])
329
330     # test_*_timeout verify that KeepClient instructs pycurl to use
331     # the appropriate connection and read timeouts. They don't care
332     # whether pycurl actually exhibits the expected timeout behavior
333     # -- those tests are in the KeepClientTimeout test class.
334
335     def test_get_timeout(self):
336         api_client = self.mock_keep_services(count=1)
337         force_timeout = socket.timeout("timed out")
338         with tutil.mock_keep_responses(force_timeout, 0) as mock:
339             keep_client = arvados.KeepClient(
340                 api_client=api_client,
341                 block_cache=self.make_block_cache(self.disk_cache),
342                 num_retries=0,
343             )
344             with self.assertRaises(arvados.errors.KeepReadError):
345                 keep_client.get('ffffffffffffffffffffffffffffffff')
346             self.assertEqual(
347                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
348                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
349             self.assertEqual(
350                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
351                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
352             self.assertEqual(
353                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
354                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
355
356     def test_put_timeout(self):
357         api_client = self.mock_keep_services(count=1)
358         force_timeout = socket.timeout("timed out")
359         with tutil.mock_keep_responses(force_timeout, 0) as mock:
360             keep_client = arvados.KeepClient(
361                 api_client=api_client,
362                 block_cache=self.make_block_cache(self.disk_cache),
363                 num_retries=0,
364             )
365             with self.assertRaises(arvados.errors.KeepWriteError):
366                 keep_client.put(b'foo')
367             self.assertEqual(
368                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
369                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
370             self.assertEqual(
371                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
372                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
373             self.assertEqual(
374                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
375                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
376
377     def test_head_timeout(self):
378         api_client = self.mock_keep_services(count=1)
379         force_timeout = socket.timeout("timed out")
380         with tutil.mock_keep_responses(force_timeout, 0) as mock:
381             keep_client = arvados.KeepClient(
382                 api_client=api_client,
383                 block_cache=self.make_block_cache(self.disk_cache),
384                 num_retries=0,
385             )
386             with self.assertRaises(arvados.errors.KeepReadError):
387                 keep_client.head('ffffffffffffffffffffffffffffffff')
388             self.assertEqual(
389                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
390                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
391             self.assertEqual(
392                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
393                 None)
394             self.assertEqual(
395                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
396                 None)
397
398     def test_proxy_get_timeout(self):
399         api_client = self.mock_keep_services(service_type='proxy', count=1)
400         force_timeout = socket.timeout("timed out")
401         with tutil.mock_keep_responses(force_timeout, 0) as mock:
402             keep_client = arvados.KeepClient(
403                 api_client=api_client,
404                 block_cache=self.make_block_cache(self.disk_cache),
405                 num_retries=0,
406             )
407             with self.assertRaises(arvados.errors.KeepReadError):
408                 keep_client.get('ffffffffffffffffffffffffffffffff')
409             self.assertEqual(
410                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
411                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
412             self.assertEqual(
413                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
414                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
415             self.assertEqual(
416                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
417                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
418
419     def test_proxy_head_timeout(self):
420         api_client = self.mock_keep_services(service_type='proxy', count=1)
421         force_timeout = socket.timeout("timed out")
422         with tutil.mock_keep_responses(force_timeout, 0) as mock:
423             keep_client = arvados.KeepClient(
424                 api_client=api_client,
425                 block_cache=self.make_block_cache(self.disk_cache),
426                 num_retries=0,
427             )
428             with self.assertRaises(arvados.errors.KeepReadError):
429                 keep_client.head('ffffffffffffffffffffffffffffffff')
430             self.assertEqual(
431                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
432                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
433             self.assertEqual(
434                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
435                 None)
436             self.assertEqual(
437                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
438                 None)
439
440     def test_proxy_put_timeout(self):
441         self.disk_cache_dir = None
442         api_client = self.mock_keep_services(service_type='proxy', count=1)
443         force_timeout = socket.timeout("timed out")
444         with tutil.mock_keep_responses(force_timeout, 0) as mock:
445             keep_client = arvados.KeepClient(
446                 api_client=api_client,
447                 num_retries=0,
448             )
449             with self.assertRaises(arvados.errors.KeepWriteError):
450                 keep_client.put('foo')
451             self.assertEqual(
452                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
453                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
454             self.assertEqual(
455                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
456                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
457             self.assertEqual(
458                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
459                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
460
461     def check_no_services_error(self, verb, exc_class):
462         api_client = mock.MagicMock(name='api_client')
463         api_client.keep_services().accessible().execute.side_effect = (
464             arvados.errors.ApiError)
465         keep_client = arvados.KeepClient(
466             api_client=api_client,
467             block_cache=self.make_block_cache(self.disk_cache),
468             num_retries=0,
469         )
470         with self.assertRaises(exc_class) as err_check:
471             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
472         self.assertEqual(0, len(err_check.exception.request_errors()))
473
474     def test_get_error_with_no_services(self):
475         self.check_no_services_error('get', arvados.errors.KeepReadError)
476
477     def test_head_error_with_no_services(self):
478         self.check_no_services_error('head', arvados.errors.KeepReadError)
479
480     def test_put_error_with_no_services(self):
481         self.check_no_services_error('put', arvados.errors.KeepWriteError)
482
483     def check_errors_from_last_retry(self, verb, exc_class):
484         api_client = self.mock_keep_services(count=2)
485         req_mock = tutil.mock_keep_responses(
486             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
487         with req_mock, tutil.skip_sleep, \
488                 self.assertRaises(exc_class) as err_check:
489             keep_client = arvados.KeepClient(
490                 api_client=api_client,
491                 block_cache=self.make_block_cache(self.disk_cache),
492                 num_retries=0,
493             )
494             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
495                                        num_retries=3)
496         self.assertEqual([502, 502], [
497                 getattr(error, 'status_code', None)
498                 for error in err_check.exception.request_errors().values()])
499         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
500
501     def test_get_error_reflects_last_retry(self):
502         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
503
504     def test_head_error_reflects_last_retry(self):
505         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
506
507     def test_put_error_reflects_last_retry(self):
508         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
509
510     def test_put_error_does_not_include_successful_puts(self):
511         data = 'partial failure test'
512         data_loc = tutil.str_keep_locator(data)
513         api_client = self.mock_keep_services(count=3)
514         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
515                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
516             keep_client = arvados.KeepClient(
517                 api_client=api_client,
518                 block_cache=self.make_block_cache(self.disk_cache),
519                 num_retries=0,
520             )
521             keep_client.put(data)
522         self.assertEqual(2, len(exc_check.exception.request_errors()))
523
524     def test_proxy_put_with_no_writable_services(self):
525         data = 'test with no writable services'
526         data_loc = tutil.str_keep_locator(data)
527         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
528         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
529                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
530             keep_client = arvados.KeepClient(
531                 api_client=api_client,
532                 block_cache=self.make_block_cache(self.disk_cache),
533                 num_retries=0,
534             )
535             keep_client.put(data)
536         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
537         self.assertEqual(0, len(exc_check.exception.request_errors()))
538
539     def test_oddball_service_get(self):
540         body = b'oddball service get'
541         api_client = self.mock_keep_services(service_type='fancynewblobstore')
542         with tutil.mock_keep_responses(body, 200):
543             keep_client = arvados.KeepClient(
544                 api_client=api_client,
545                 block_cache=self.make_block_cache(self.disk_cache),
546                 num_retries=0,
547             )
548             actual = keep_client.get(tutil.str_keep_locator(body))
549         self.assertEqual(body, actual)
550
551     def test_oddball_service_put(self):
552         body = b'oddball service put'
553         pdh = tutil.str_keep_locator(body)
554         api_client = self.mock_keep_services(service_type='fancynewblobstore')
555         with tutil.mock_keep_responses(pdh, 200):
556             keep_client = arvados.KeepClient(
557                 api_client=api_client,
558                 block_cache=self.make_block_cache(self.disk_cache),
559                 num_retries=0,
560             )
561             actual = keep_client.put(body, copies=1)
562         self.assertEqual(pdh, actual)
563
564     def test_oddball_service_writer_count(self):
565         body = b'oddball service writer count'
566         pdh = tutil.str_keep_locator(body)
567         api_client = self.mock_keep_services(service_type='fancynewblobstore',
568                                              count=4)
569         headers = {'x-keep-replicas-stored': 3}
570         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
571                                        **headers) as req_mock:
572             keep_client = arvados.KeepClient(
573                 api_client=api_client,
574                 block_cache=self.make_block_cache(self.disk_cache),
575                 num_retries=0,
576             )
577             actual = keep_client.put(body, copies=2)
578         self.assertEqual(pdh, actual)
579         self.assertEqual(1, req_mock.call_count)
580
581
582 @tutil.skip_sleep
583 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
584 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
585     disk_cache = False
586
587     def setUp(self):
588         self.api_client = self.mock_keep_services(count=2)
589         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
590         self.data = b'xyzzy'
591         self.locator = '1271ed5ef305aadabc605b1609e24c52'
592
593     def tearDown(self):
594         DiskCacheBase.tearDown(self)
595
596     @mock.patch('arvados.KeepClient.KeepService.get')
597     def test_get_request_cache(self, get_mock):
598         with tutil.mock_keep_responses(self.data, 200, 200):
599             self.keep_client.get(self.locator)
600             self.keep_client.get(self.locator)
601         # Request already cached, don't require more than one request
602         get_mock.assert_called_once()
603
604     @mock.patch('arvados.KeepClient.KeepService.get')
605     def test_head_request_cache(self, get_mock):
606         with tutil.mock_keep_responses(self.data, 200, 200):
607             self.keep_client.head(self.locator)
608             self.keep_client.head(self.locator)
609         # Don't cache HEAD requests so that they're not confused with GET reqs
610         self.assertEqual(2, get_mock.call_count)
611
612     @mock.patch('arvados.KeepClient.KeepService.get')
613     def test_head_and_then_get_return_different_responses(self, get_mock):
614         head_resp = None
615         get_resp = None
616         get_mock.side_effect = [b'first response', b'second response']
617         with tutil.mock_keep_responses(self.data, 200, 200):
618             head_resp = self.keep_client.head(self.locator)
619             get_resp = self.keep_client.get(self.locator)
620         self.assertEqual(b'first response', head_resp)
621         # First reponse was not cached because it was from a HEAD request.
622         self.assertNotEqual(head_resp, get_resp)
623
624
625 @tutil.skip_sleep
626 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
627 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
628     disk_cache = False
629
630     def setUp(self):
631         self.api_client = self.mock_keep_services(count=2)
632         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
633         self.data = b'xyzzy'
634         self.locator = '1271ed5ef305aadabc605b1609e24c52'
635         self.test_id = arvados.util.new_request_id()
636         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
637         # If we don't set request_id to None explicitly here, it will
638         # return <MagicMock name='api_client_mock.request_id'
639         # id='123456789'>:
640         self.api_client.request_id = None
641
642     def tearDown(self):
643         DiskCacheBase.tearDown(self)
644
645     def test_default_to_api_client_request_id(self):
646         self.api_client.request_id = self.test_id
647         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
648             self.keep_client.put(self.data)
649         self.assertEqual(2, len(mock.responses))
650         for resp in mock.responses:
651             self.assertProvidedRequestId(resp)
652
653         with tutil.mock_keep_responses(self.data, 200) as mock:
654             self.keep_client.get(self.locator)
655         self.assertProvidedRequestId(mock.responses[0])
656
657         with tutil.mock_keep_responses(b'', 200) as mock:
658             self.keep_client.head(self.locator)
659         self.assertProvidedRequestId(mock.responses[0])
660
661     def test_explicit_request_id(self):
662         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
663             self.keep_client.put(self.data, request_id=self.test_id)
664         self.assertEqual(2, len(mock.responses))
665         for resp in mock.responses:
666             self.assertProvidedRequestId(resp)
667
668         with tutil.mock_keep_responses(self.data, 200) as mock:
669             self.keep_client.get(self.locator, request_id=self.test_id)
670         self.assertProvidedRequestId(mock.responses[0])
671
672         with tutil.mock_keep_responses(b'', 200) as mock:
673             self.keep_client.head(self.locator, request_id=self.test_id)
674         self.assertProvidedRequestId(mock.responses[0])
675
676     def test_automatic_request_id(self):
677         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
678             self.keep_client.put(self.data)
679         self.assertEqual(2, len(mock.responses))
680         for resp in mock.responses:
681             self.assertAutomaticRequestId(resp)
682
683         with tutil.mock_keep_responses(self.data, 200) as mock:
684             self.keep_client.get(self.locator)
685         self.assertAutomaticRequestId(mock.responses[0])
686
687         with tutil.mock_keep_responses(b'', 200) as mock:
688             self.keep_client.head(self.locator)
689         self.assertAutomaticRequestId(mock.responses[0])
690
691     def test_request_id_in_exception(self):
692         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
693             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
694                 self.keep_client.head(self.locator, request_id=self.test_id)
695
696         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
697             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
698                 self.keep_client.get(self.locator)
699
700         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
701             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
702                 self.keep_client.put(self.data, request_id=self.test_id)
703
704         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
705             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
706                 self.keep_client.put(self.data)
707
708     def assertAutomaticRequestId(self, resp):
709         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
710                if x.startswith('X-Request-Id: ')][0]
711         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
712         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
713
714     def assertProvidedRequestId(self, resp):
715         self.assertIn('X-Request-Id: '+self.test_id,
716                       resp.getopt(pycurl.HTTPHEADER))
717
718
719 @tutil.skip_sleep
720 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
721 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
722     disk_cache = False
723
724     def setUp(self):
725         # expected_order[i] is the probe order for
726         # hash=md5(sprintf("%064x",i)) where there are 16 services
727         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
728         # the first probe for the block consisting of 64 "0"
729         # characters is the service whose uuid is
730         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
731         self.services = 16
732         self.expected_order = [
733             list('3eab2d5fc9681074'),
734             list('097dba52e648f1c3'),
735             list('c5b4e023f8a7d691'),
736             list('9d81c02e76a3bf54'),
737             ]
738         self.blocks = [
739             "{:064x}".format(x).encode()
740             for x in range(len(self.expected_order))]
741         self.hashes = [
742             hashlib.md5(self.blocks[x]).hexdigest()
743             for x in range(len(self.expected_order))]
744         self.api_client = self.mock_keep_services(count=self.services)
745         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
746
747     def tearDown(self):
748         DiskCacheBase.tearDown(self)
749
750     def test_weighted_service_roots_against_reference_set(self):
751         # Confirm weighted_service_roots() returns the correct order
752         for i, hash in enumerate(self.hashes):
753             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
754             got_order = [
755                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
756                 for root in roots]
757             self.assertEqual(self.expected_order[i], got_order)
758
759     def test_get_probe_order_against_reference_set(self):
760         self._test_probe_order_against_reference_set(
761             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
762
763     def test_head_probe_order_against_reference_set(self):
764         self._test_probe_order_against_reference_set(
765             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
766
767     def test_put_probe_order_against_reference_set(self):
768         # copies=1 prevents the test from being sensitive to races
769         # between writer threads.
770         self._test_probe_order_against_reference_set(
771             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
772
773     def _test_probe_order_against_reference_set(self, op):
774         for i in range(len(self.blocks)):
775             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
776                  self.assertRaises(arvados.errors.KeepRequestError):
777                 op(i)
778             got_order = [
779                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
780                 for resp in mock.responses]
781             self.assertEqual(self.expected_order[i]*2, got_order)
782
783     def test_put_probe_order_multiple_copies(self):
784         for copies in range(2, 4):
785             for i in range(len(self.blocks)):
786                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
787                      self.assertRaises(arvados.errors.KeepWriteError):
788                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
789                 got_order = [
790                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
791                     for resp in mock.responses]
792                 # With T threads racing to make requests, the position
793                 # of a given server in the sequence of HTTP requests
794                 # (got_order) cannot be more than T-1 positions
795                 # earlier than that server's position in the reference
796                 # probe sequence (expected_order).
797                 #
798                 # Loop invariant: we have accounted for +pos+ expected
799                 # probes, either by seeing them in +got_order+ or by
800                 # putting them in +pending+ in the hope of seeing them
801                 # later. As long as +len(pending)<T+, we haven't
802                 # started a request too early.
803                 pending = []
804                 for pos, expected in enumerate(self.expected_order[i]*3):
805                     got = got_order[pos-len(pending)]
806                     while got in pending:
807                         del pending[pending.index(got)]
808                         got = got_order[pos-len(pending)]
809                     if got != expected:
810                         pending.append(expected)
811                         self.assertLess(
812                             len(pending), copies,
813                             "pending={}, with copies={}, got {}, expected {}".format(
814                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
815
816     def test_probe_waste_adding_one_server(self):
817         hashes = [
818             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
819         initial_services = 12
820         self.api_client = self.mock_keep_services(count=initial_services)
821         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
822         probes_before = [
823             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
824         for added_services in range(1, 12):
825             api_client = self.mock_keep_services(count=initial_services+added_services)
826             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
827             total_penalty = 0
828             for hash_index in range(len(hashes)):
829                 probe_after = keep_client.weighted_service_roots(
830                     arvados.KeepLocator(hashes[hash_index]))
831                 penalty = probe_after.index(probes_before[hash_index][0])
832                 self.assertLessEqual(penalty, added_services)
833                 total_penalty += penalty
834             # Average penalty per block should not exceed
835             # N(added)/N(orig) by more than 20%, and should get closer
836             # to the ideal as we add data points.
837             expect_penalty = (
838                 added_services *
839                 len(hashes) / initial_services)
840             max_penalty = (
841                 expect_penalty *
842                 (120 - added_services)/100)
843             min_penalty = (
844                 expect_penalty * 8/10)
845             self.assertTrue(
846                 min_penalty <= total_penalty <= max_penalty,
847                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
848                     initial_services,
849                     added_services,
850                     len(hashes),
851                     total_penalty,
852                     min_penalty,
853                     max_penalty))
854
855     def check_64_zeros_error_order(self, verb, exc_class):
856         data = b'0' * 64
857         if verb == 'get':
858             data = tutil.str_keep_locator(data)
859         # Arbitrary port number:
860         aport = random.randint(1024,65535)
861         api_client = self.mock_keep_services(service_port=aport, count=self.services)
862         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
863         with mock.patch('pycurl.Curl') as curl_mock, \
864              self.assertRaises(exc_class) as err_check:
865             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
866             getattr(keep_client, verb)(data)
867         urls = [urllib.parse.urlparse(url)
868                 for url in err_check.exception.request_errors()]
869         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
870                          [(url.hostname, url.port) for url in urls])
871
872     def test_get_error_shows_probe_order(self):
873         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
874
875     def test_put_error_shows_probe_order(self):
876         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
877
878
879 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
880 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
881     disk_cache = False
882
883     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
884     # 1s worth of data and then trigger bandwidth errors before running
885     # out of data.
886     DATA = b'x'*2**11
887     BANDWIDTH_LOW_LIM = 1024
888     TIMEOUT_TIME = 1.0
889
890     def tearDown(self):
891         DiskCacheBase.tearDown(self)
892
893     class assertTakesBetween(unittest.TestCase):
894         def __init__(self, tmin, tmax):
895             self.tmin = tmin
896             self.tmax = tmax
897
898         def __enter__(self):
899             self.t0 = time.time()
900
901         def __exit__(self, *args, **kwargs):
902             # Round times to milliseconds, like CURL. Otherwise, we
903             # fail when CURL reaches a 1s timeout at 0.9998s.
904             delta = round(time.time() - self.t0, 3)
905             self.assertGreaterEqual(delta, self.tmin)
906             self.assertLessEqual(delta, self.tmax)
907
908     class assertTakesGreater(unittest.TestCase):
909         def __init__(self, tmin):
910             self.tmin = tmin
911
912         def __enter__(self):
913             self.t0 = time.time()
914
915         def __exit__(self, *args, **kwargs):
916             delta = round(time.time() - self.t0, 3)
917             self.assertGreaterEqual(delta, self.tmin)
918
919     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
920         return arvados.KeepClient(
921             api_client=self.api_client,
922             timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
923
924     def test_timeout_slow_connect(self):
925         # Can't simulate TCP delays with our own socket. Leave our
926         # stub server running uselessly, and try to connect to an
927         # unroutable IP address instead.
928         self.api_client = self.mock_keep_services(
929             count=1,
930             service_host='240.0.0.0',
931         )
932         with self.assertTakesBetween(0.1, 0.5):
933             with self.assertRaises(arvados.errors.KeepWriteError):
934                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
935
936     def test_low_bandwidth_no_delays_success(self):
937         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
938         kc = self.keepClient()
939         loc = kc.put(self.DATA, copies=1, num_retries=0)
940         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
941
942     def test_too_low_bandwidth_no_delays_failure(self):
943         # Check that lessening bandwidth corresponds to failing
944         kc = self.keepClient()
945         loc = kc.put(self.DATA, copies=1, num_retries=0)
946         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
947         with self.assertTakesGreater(self.TIMEOUT_TIME):
948             with self.assertRaises(arvados.errors.KeepReadError):
949                 kc.get(loc, num_retries=0)
950         with self.assertTakesGreater(self.TIMEOUT_TIME):
951             with self.assertRaises(arvados.errors.KeepWriteError):
952                 kc.put(self.DATA, copies=1, num_retries=0)
953
954     def test_low_bandwidth_with_server_response_delay_failure(self):
955         kc = self.keepClient()
956         loc = kc.put(self.DATA, copies=1, num_retries=0)
957         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
958         # Note the actual delay must be 1s longer than the low speed
959         # limit interval in order for curl to detect it reliably.
960         self.server.setdelays(response=self.TIMEOUT_TIME+1)
961         with self.assertTakesGreater(self.TIMEOUT_TIME):
962             with self.assertRaises(arvados.errors.KeepReadError):
963                 kc.get(loc, num_retries=0)
964         with self.assertTakesGreater(self.TIMEOUT_TIME):
965             with self.assertRaises(arvados.errors.KeepWriteError):
966                 kc.put(self.DATA, copies=1, num_retries=0)
967         with self.assertTakesGreater(self.TIMEOUT_TIME):
968             kc.head(loc, num_retries=0)
969
970     def test_low_bandwidth_with_server_mid_delay_failure(self):
971         kc = self.keepClient()
972         loc = kc.put(self.DATA, copies=1, num_retries=0)
973         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
974         # Note the actual delay must be 1s longer than the low speed
975         # limit interval in order for curl to detect it reliably.
976         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
977         with self.assertTakesGreater(self.TIMEOUT_TIME):
978             with self.assertRaises(arvados.errors.KeepReadError) as e:
979                 kc.get(loc, num_retries=0)
980         with self.assertTakesGreater(self.TIMEOUT_TIME):
981             with self.assertRaises(arvados.errors.KeepWriteError):
982                 kc.put(self.DATA, copies=1, num_retries=0)
983
984     def test_timeout_slow_request(self):
985         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
986         self.server.setdelays(request=.2)
987         self._test_connect_timeout_under_200ms(loc)
988         self.server.setdelays(request=2)
989         self._test_response_timeout_under_2s(loc)
990
991     def test_timeout_slow_response(self):
992         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
993         self.server.setdelays(response=.2)
994         self._test_connect_timeout_under_200ms(loc)
995         self.server.setdelays(response=2)
996         self._test_response_timeout_under_2s(loc)
997
998     def test_timeout_slow_response_body(self):
999         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1000         self.server.setdelays(response_body=.2)
1001         self._test_connect_timeout_under_200ms(loc)
1002         self.server.setdelays(response_body=2)
1003         self._test_response_timeout_under_2s(loc)
1004
1005     def _test_connect_timeout_under_200ms(self, loc):
1006         # Allow 100ms to connect, then 1s for response. Everything
1007         # should work, and everything should take at least 200ms to
1008         # return.
1009         kc = self.keepClient(timeouts=(.1, 1))
1010         with self.assertTakesBetween(.2, .3):
1011             kc.put(self.DATA, copies=1, num_retries=0)
1012         with self.assertTakesBetween(.2, .3):
1013             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1014
1015     def _test_response_timeout_under_2s(self, loc):
1016         # Allow 10s to connect, then 1s for response. Nothing should
1017         # work, and everything should take at least 1s to return.
1018         kc = self.keepClient(timeouts=(10, 1))
1019         with self.assertTakesBetween(1, 9):
1020             with self.assertRaises(arvados.errors.KeepReadError):
1021                 kc.get(loc, num_retries=0)
1022         with self.assertTakesBetween(1, 9):
1023             with self.assertRaises(arvados.errors.KeepWriteError):
1024                 kc.put(self.DATA, copies=1, num_retries=0)
1025
1026
1027 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1028 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1029     disk_cache = False
1030
1031     def tearDown(self):
1032         DiskCacheBase.tearDown(self)
1033
1034     def mock_disks_and_gateways(self, disks=3, gateways=1):
1035         self.gateways = [{
1036                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1037                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1038                 'service_host': 'gatewayhost{}'.format(i),
1039                 'service_port': 12345,
1040                 'service_ssl_flag': True,
1041                 'service_type': 'gateway:test',
1042         } for i in range(gateways)]
1043         self.gateway_roots = [
1044             "https://{service_host}:{service_port}/".format(**gw)
1045             for gw in self.gateways]
1046         self.api_client = self.mock_keep_services(
1047             count=disks, additional_services=self.gateways)
1048         self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1049
1050     @mock.patch('pycurl.Curl')
1051     def test_get_with_gateway_hint_first(self, MockCurl):
1052         MockCurl.return_value = tutil.FakeCurl.make(
1053             code=200, body='foo', headers={'Content-Length': 3})
1054         self.mock_disks_and_gateways()
1055         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1056         self.assertEqual(b'foo', self.keepClient.get(locator))
1057         self.assertEqual(self.gateway_roots[0]+locator,
1058                          MockCurl.return_value.getopt(pycurl.URL).decode())
1059         self.assertEqual(True, self.keepClient.head(locator))
1060
1061     @mock.patch('pycurl.Curl')
1062     def test_get_with_gateway_hints_in_order(self, MockCurl):
1063         gateways = 4
1064         disks = 3
1065         mocks = [
1066             tutil.FakeCurl.make(code=404, body='')
1067             for _ in range(gateways+disks)
1068         ]
1069         MockCurl.side_effect = tutil.queue_with(mocks)
1070         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1071         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1072                            ['K@'+gw['uuid'] for gw in self.gateways])
1073         with self.assertRaises(arvados.errors.NotFoundError):
1074             self.keepClient.get(locator)
1075         # Gateways are tried first, in the order given.
1076         for i, root in enumerate(self.gateway_roots):
1077             self.assertEqual(root+locator,
1078                              mocks[i].getopt(pycurl.URL).decode())
1079         # Disk services are tried next.
1080         for i in range(gateways, gateways+disks):
1081             self.assertRegex(
1082                 mocks[i].getopt(pycurl.URL).decode(),
1083                 r'keep0x')
1084
1085     @mock.patch('pycurl.Curl')
1086     def test_head_with_gateway_hints_in_order(self, MockCurl):
1087         gateways = 4
1088         disks = 3
1089         mocks = [
1090             tutil.FakeCurl.make(code=404, body=b'')
1091             for _ in range(gateways+disks)
1092         ]
1093         MockCurl.side_effect = tutil.queue_with(mocks)
1094         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1095         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1096                            ['K@'+gw['uuid'] for gw in self.gateways])
1097         with self.assertRaises(arvados.errors.NotFoundError):
1098             self.keepClient.head(locator)
1099         # Gateways are tried first, in the order given.
1100         for i, root in enumerate(self.gateway_roots):
1101             self.assertEqual(root+locator,
1102                              mocks[i].getopt(pycurl.URL).decode())
1103         # Disk services are tried next.
1104         for i in range(gateways, gateways+disks):
1105             self.assertRegex(
1106                 mocks[i].getopt(pycurl.URL).decode(),
1107                 r'keep0x')
1108
1109     @mock.patch('pycurl.Curl')
1110     def test_get_with_remote_proxy_hint(self, MockCurl):
1111         MockCurl.return_value = tutil.FakeCurl.make(
1112             code=200, body=b'foo', headers={'Content-Length': 3})
1113         self.mock_disks_and_gateways()
1114         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1115         self.assertEqual(b'foo', self.keepClient.get(locator))
1116         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1117                          MockCurl.return_value.getopt(pycurl.URL).decode())
1118
1119     @mock.patch('pycurl.Curl')
1120     def test_head_with_remote_proxy_hint(self, MockCurl):
1121         MockCurl.return_value = tutil.FakeCurl.make(
1122             code=200, body=b'foo', headers={'Content-Length': 3})
1123         self.mock_disks_and_gateways()
1124         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1125         self.assertEqual(True, self.keepClient.head(locator))
1126         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1127                          MockCurl.return_value.getopt(pycurl.URL).decode())
1128
1129
1130 class KeepClientRetryTestMixin(object):
1131     disk_cache = False
1132
1133     # Testing with a local Keep store won't exercise the retry behavior.
1134     # Instead, our strategy is:
1135     # * Create a client with one proxy specified (pointed at a black
1136     #   hole), so there's no need to instantiate an API client, and
1137     #   all HTTP requests come from one place.
1138     # * Mock httplib's request method to provide simulated responses.
1139     # This lets us test the retry logic extensively without relying on any
1140     # supporting servers, and prevents side effects in case something hiccups.
1141     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1142     # run_method().
1143     #
1144     # Test classes must define TEST_PATCHER to a method that mocks
1145     # out appropriate methods in the client.
1146
1147     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1148     TEST_DATA = b'testdata'
1149     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1150
1151     def setUp(self):
1152         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1153
1154     def new_client(self, **caller_kwargs):
1155         kwargs = self.client_kwargs.copy()
1156         kwargs.update(caller_kwargs)
1157         kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1158         return arvados.KeepClient(**kwargs)
1159
1160     def run_method(self, *args, **kwargs):
1161         raise NotImplementedError("test subclasses must define run_method")
1162
1163     def check_success(self, expected=None, *args, **kwargs):
1164         if expected is None:
1165             expected = self.DEFAULT_EXPECT
1166         self.assertEqual(expected, self.run_method(*args, **kwargs))
1167
1168     def check_exception(self, error_class=None, *args, **kwargs):
1169         if error_class is None:
1170             error_class = self.DEFAULT_EXCEPTION
1171         with self.assertRaises(error_class) as err:
1172             self.run_method(*args, **kwargs)
1173         return err
1174
1175     def test_immediate_success(self):
1176         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1177             self.check_success()
1178
1179     def test_retry_then_success(self):
1180         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1181             self.check_success(num_retries=3)
1182
1183     def test_exception_then_success(self):
1184         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1185             self.check_success(num_retries=3)
1186
1187     def test_no_retry_after_permanent_error(self):
1188         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1189             self.check_exception(num_retries=3)
1190
1191     def test_error_after_retries_exhausted(self):
1192         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1193             err = self.check_exception(num_retries=1)
1194         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1195
1196     def test_num_retries_instance_fallback(self):
1197         self.client_kwargs['num_retries'] = 3
1198         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1199             self.check_success()
1200
1201
1202 @tutil.skip_sleep
1203 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1204 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1205     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1206     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1207     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1208     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1209
1210     def tearDown(self):
1211         DiskCacheBase.tearDown(self)
1212
1213     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1214                    *args, **kwargs):
1215         return self.new_client().get(locator, *args, **kwargs)
1216
1217     def test_specific_exception_when_not_found(self):
1218         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1219             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1220
1221     def test_general_exception_with_mixed_errors(self):
1222         # get should raise a NotFoundError if no server returns the block,
1223         # and a high threshold of servers report that it's not found.
1224         # This test rigs up 50/50 disagreement between two servers, and
1225         # checks that it does not become a NotFoundError.
1226         client = self.new_client(num_retries=0)
1227         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1228             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1229                 client.get(self.HINTED_LOCATOR)
1230             self.assertNotIsInstance(
1231                 exc_check.exception, arvados.errors.NotFoundError,
1232                 "mixed errors raised NotFoundError")
1233
1234     def test_hint_server_can_succeed_without_retries(self):
1235         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1236             self.check_success(locator=self.HINTED_LOCATOR)
1237
1238     def test_try_next_server_after_timeout(self):
1239         with tutil.mock_keep_responses(
1240                 (socket.timeout("timed out"), 200),
1241                 (self.DEFAULT_EXPECT, 200)):
1242             self.check_success(locator=self.HINTED_LOCATOR)
1243
1244     def test_retry_data_with_wrong_checksum(self):
1245         with tutil.mock_keep_responses(
1246                 ('baddata', 200),
1247                 (self.DEFAULT_EXPECT, 200)):
1248             self.check_success(locator=self.HINTED_LOCATOR)
1249
1250
1251 @tutil.skip_sleep
1252 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1253 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1254     DEFAULT_EXPECT = True
1255     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1256     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1257     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1258
1259     def tearDown(self):
1260         DiskCacheBase.tearDown(self)
1261
1262     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1263                    *args, **kwargs):
1264         return self.new_client().head(locator, *args, **kwargs)
1265
1266     def test_specific_exception_when_not_found(self):
1267         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1268             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1269
1270     def test_general_exception_with_mixed_errors(self):
1271         # head should raise a NotFoundError if no server returns the block,
1272         # and a high threshold of servers report that it's not found.
1273         # This test rigs up 50/50 disagreement between two servers, and
1274         # checks that it does not become a NotFoundError.
1275         client = self.new_client(num_retries=0)
1276         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1277             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1278                 client.head(self.HINTED_LOCATOR)
1279             self.assertNotIsInstance(
1280                 exc_check.exception, arvados.errors.NotFoundError,
1281                 "mixed errors raised NotFoundError")
1282
1283     def test_hint_server_can_succeed_without_retries(self):
1284         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1285             self.check_success(locator=self.HINTED_LOCATOR)
1286
1287     def test_try_next_server_after_timeout(self):
1288         with tutil.mock_keep_responses(
1289                 (socket.timeout("timed out"), 200),
1290                 (self.DEFAULT_EXPECT, 200)):
1291             self.check_success(locator=self.HINTED_LOCATOR)
1292
1293
1294 @tutil.skip_sleep
1295 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1296 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1297     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1298     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1299     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1300
1301     def tearDown(self):
1302         DiskCacheBase.tearDown(self)
1303
1304     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1305                    copies=1, *args, **kwargs):
1306         return self.new_client().put(data, copies, *args, **kwargs)
1307
1308     def test_do_not_send_multiple_copies_to_same_server(self):
1309         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1310             self.check_exception(copies=2, num_retries=3)
1311
1312
1313 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1314     class FakeKeepService(object):
1315         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1316             self.delay = delay
1317             self.will_succeed = will_succeed
1318             self.will_raise = will_raise
1319             self._result = {}
1320             self._result['headers'] = {}
1321             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1322             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1323             self._result['body'] = 'foobar'
1324
1325         def put(self, data_hash, data, timeout, headers):
1326             time.sleep(self.delay)
1327             if self.will_raise is not None:
1328                 raise self.will_raise
1329             return self.will_succeed
1330
1331         def last_result(self):
1332             if self.will_succeed:
1333                 return self._result
1334             else:
1335                 return {"status_code": 500, "body": "didn't succeed"}
1336
1337         def finished(self):
1338             return False
1339
1340
1341     def setUp(self):
1342         self.copies = 3
1343         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1344             data = 'foo',
1345             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1346             max_service_replicas = self.copies,
1347             copies = self.copies
1348         )
1349
1350     def test_only_write_enough_on_success(self):
1351         for i in range(10):
1352             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1353             self.pool.add_task(ks, None)
1354         self.pool.join()
1355         self.assertEqual(self.pool.done(), (self.copies, []))
1356
1357     def test_only_write_enough_on_partial_success(self):
1358         for i in range(5):
1359             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1360             self.pool.add_task(ks, None)
1361             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1362             self.pool.add_task(ks, None)
1363         self.pool.join()
1364         self.assertEqual(self.pool.done(), (self.copies, []))
1365
1366     def test_only_write_enough_when_some_crash(self):
1367         for i in range(5):
1368             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1369             self.pool.add_task(ks, None)
1370             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1371             self.pool.add_task(ks, None)
1372         self.pool.join()
1373         self.assertEqual(self.pool.done(), (self.copies, []))
1374
1375     def test_fail_when_too_many_crash(self):
1376         for i in range(self.copies+1):
1377             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1378             self.pool.add_task(ks, None)
1379         for i in range(self.copies-1):
1380             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1381             self.pool.add_task(ks, None)
1382         self.pool.join()
1383         self.assertEqual(self.pool.done(), (self.copies-1, []))
1384
1385
1386 @tutil.skip_sleep
1387 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1388 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1389     block_cache = False
1390
1391     # Test put()s that need two distinct servers to succeed, possibly
1392     # requiring multiple passes through the retry loop.
1393
1394     def setUp(self):
1395         self.api_client = self.mock_keep_services(count=2)
1396         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1397
1398     def tearDown(self):
1399         DiskCacheBase.tearDown(self)
1400
1401     def test_success_after_exception(self):
1402         with tutil.mock_keep_responses(
1403                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1404                 Exception('mock err'), 200, 200) as req_mock:
1405             self.keep_client.put('foo', num_retries=1, copies=2)
1406         self.assertEqual(3, req_mock.call_count)
1407
1408     def test_success_after_retryable_error(self):
1409         with tutil.mock_keep_responses(
1410                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1411                 500, 200, 200) as req_mock:
1412             self.keep_client.put('foo', num_retries=1, copies=2)
1413         self.assertEqual(3, req_mock.call_count)
1414
1415     def test_fail_after_final_error(self):
1416         # First retry loop gets a 200 (can't achieve replication by
1417         # storing again on that server) and a 400 (can't retry that
1418         # server at all), so we shouldn't try a third request.
1419         with tutil.mock_keep_responses(
1420                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1421                 200, 400, 200) as req_mock:
1422             with self.assertRaises(arvados.errors.KeepWriteError):
1423                 self.keep_client.put('foo', num_retries=1, copies=2)
1424         self.assertEqual(2, req_mock.call_count)
1425
1426
1427 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1428 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1429     disk_cache = False
1430
1431     def tearDown(self):
1432         DiskCacheBase.tearDown(self)
1433
1434     def test_api_fail(self):
1435         class ApiMock(object):
1436             def __getattr__(self, r):
1437                 if r == "api_token":
1438                     return "abc"
1439                 elif r == "insecure":
1440                     return False
1441                 elif r == "config":
1442                     return lambda: {}
1443                 else:
1444                     raise arvados.errors.KeepReadError()
1445         keep_client = arvados.KeepClient(api_client=ApiMock(),
1446                                          proxy='', local_store='',
1447                                          block_cache=self.make_block_cache(self.disk_cache))
1448
1449         # The bug this is testing for is that if an API (not
1450         # keepstore) exception is thrown as part of a get(), the next
1451         # attempt to get that same block will result in a deadlock.
1452         # This is why there are two get()s in a row.  Unfortunately,
1453         # the failure mode for this test is that the test suite
1454         # deadlocks, there isn't a good way to avoid that without
1455         # adding a special case that has no use except for this test.
1456
1457         with self.assertRaises(arvados.errors.KeepReadError):
1458             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1459         with self.assertRaises(arvados.errors.KeepReadError):
1460             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1461
1462
1463 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1464     def setUp(self):
1465         self.api_client = self.mock_keep_services(count=2)
1466         self.data = b'xyzzy'
1467         self.locator = '1271ed5ef305aadabc605b1609e24c52'
1468         self.disk_cache_dir = tempfile.mkdtemp()
1469
1470     def tearDown(self):
1471         shutil.rmtree(self.disk_cache_dir)
1472
1473     @mock.patch('arvados.KeepClient.KeepService.get')
1474     def test_disk_cache_read(self, get_mock):
1475         # confirm it finds an existing cache block when the cache is
1476         # initialized.
1477
1478         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1479         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1480             f.write(self.data)
1481
1482         # block cache should have found the existing block
1483         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1484                                                   disk_cache_dir=self.disk_cache_dir)
1485         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1486
1487         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1488
1489         get_mock.assert_not_called()
1490
1491     @mock.patch('arvados.KeepClient.KeepService.get')
1492     def test_disk_cache_share(self, get_mock):
1493         # confirm it finds a cache block written after the disk cache
1494         # was initialized.
1495
1496         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1497                                                   disk_cache_dir=self.disk_cache_dir)
1498         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1499
1500         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1501         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1502             f.write(self.data)
1503
1504         # when we try to get the block, it'll check the disk and find it.
1505         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1506
1507         get_mock.assert_not_called()
1508
1509     def test_disk_cache_write(self):
1510         # confirm the cache block was created
1511
1512         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1513                                                   disk_cache_dir=self.disk_cache_dir)
1514         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1515
1516         with tutil.mock_keep_responses(self.data, 200) as mock:
1517             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1518
1519         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1520
1521         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1522             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1523
1524     def test_disk_cache_clean(self):
1525         # confirm that a tmp file in the cache is cleaned up
1526
1527         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1528         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1529             f.write(b"abc1")
1530
1531         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1532             f.write(b"abc2")
1533
1534         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1535             f.write(b"abc3")
1536
1537         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1538         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1539         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1540
1541         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1542                                                   disk_cache_dir=self.disk_cache_dir)
1543
1544         # The tmp still hasn't been deleted because it was created in the last 60 seconds
1545         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1546         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1547         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1548
1549         # Set the mtime to 61s in the past
1550         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1551         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1552         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1553
1554         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1555                                                    disk_cache_dir=self.disk_cache_dir)
1556
1557         # Tmp should be gone but the other ones are safe.
1558         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1559         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1560         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1561
1562     @mock.patch('arvados.KeepClient.KeepService.get')
1563     def test_disk_cache_cap(self, get_mock):
1564         # confirm that the cache is kept to the desired limit
1565
1566         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1567         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1568             f.write(self.data)
1569
1570         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1571         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1572             f.write(b"foo")
1573
1574         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1575         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1576
1577         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1578                                                    disk_cache_dir=self.disk_cache_dir,
1579                                                    max_slots=1)
1580
1581         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1582         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1583
1584     @mock.patch('arvados.KeepClient.KeepService.get')
1585     def test_disk_cache_share(self, get_mock):
1586         # confirm that a second cache doesn't delete files that belong to the first cache.
1587
1588         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1589         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1590             f.write(self.data)
1591
1592         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1593         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1594             f.write(b"foo")
1595
1596         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1597         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1598
1599         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1600                                                    disk_cache_dir=self.disk_cache_dir,
1601                                                    max_slots=2)
1602
1603         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1604         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1605
1606         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1607                                                    disk_cache_dir=self.disk_cache_dir,
1608                                                    max_slots=1)
1609
1610         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1611         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1612
1613     def test_disk_cache_error(self):
1614         os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1615
1616         # Fail during cache initialization.
1617         with self.assertRaises(OSError):
1618             block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1619                                                       disk_cache_dir=self.disk_cache_dir)
1620
1621     def test_disk_cache_write_error(self):
1622         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1623                                                   disk_cache_dir=self.disk_cache_dir)
1624
1625         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1626
1627         # Make the cache dir read-only
1628         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1629         os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1630
1631         # Cache fails
1632         with self.assertRaises(arvados.errors.KeepCacheError):
1633             with tutil.mock_keep_responses(self.data, 200) as mock:
1634                 keep_client.get(self.locator)
1635
1636     def test_disk_cache_retry_write_error(self):
1637         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1638                                                   disk_cache_dir=self.disk_cache_dir)
1639
1640         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1641
1642         called = False
1643         realmmap = mmap.mmap
1644         def sideeffect_mmap(*args, **kwargs):
1645             nonlocal called
1646             if not called:
1647                 called = True
1648                 raise OSError(errno.ENOSPC, "no space")
1649             else:
1650                 return realmmap(*args, **kwargs)
1651
1652         with patch('mmap.mmap') as mockmmap:
1653             mockmmap.side_effect = sideeffect_mmap
1654
1655             cache_max_before = block_cache.cache_max
1656
1657             with tutil.mock_keep_responses(self.data, 200) as mock:
1658                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1659
1660             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1661
1662         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1663             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1664
1665         # shrank the cache in response to ENOSPC
1666         self.assertTrue(cache_max_before > block_cache.cache_max)
1667
1668     def test_disk_cache_retry_write_error2(self):
1669         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1670                                                   disk_cache_dir=self.disk_cache_dir)
1671
1672         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1673
1674         called = False
1675         realmmap = mmap.mmap
1676         def sideeffect_mmap(*args, **kwargs):
1677             nonlocal called
1678             if not called:
1679                 called = True
1680                 raise OSError(errno.ENOMEM, "no memory")
1681             else:
1682                 return realmmap(*args, **kwargs)
1683
1684         with patch('mmap.mmap') as mockmmap:
1685             mockmmap.side_effect = sideeffect_mmap
1686
1687             slots_before = block_cache._max_slots
1688
1689             with tutil.mock_keep_responses(self.data, 200) as mock:
1690                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1691
1692             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1693
1694         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1695             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1696
1697         # shrank the cache in response to ENOMEM
1698         self.assertTrue(slots_before > block_cache._max_slots)