21700: Install Bundler system-wide in Rails postinst
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import os
14 import errno
15 import pycurl
16 import random
17 import re
18 import shutil
19 import socket
20 import sys
21 import stat
22 import tempfile
23 import time
24 import unittest
25 import urllib.parse
26 import mmap
27
28 from unittest import mock
29 from unittest.mock import patch
30
31 import parameterized
32
33 import arvados
34 import arvados.retry
35 import arvados.util
36 from . import arvados_testutil as tutil
37 from . import keepstub
38 from . import run_test_server
39
40 from .arvados_testutil import DiskCacheBase
41
42 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
43 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
44     disk_cache = False
45     MAIN_SERVER = {}
46     KEEP_SERVER = {}
47     block_cache_test = None
48
49     @classmethod
50     def setUpClass(cls):
51         super(KeepTestCase, cls).setUpClass()
52         run_test_server.authorize_with("admin")
53         cls.api_client = arvados.api('v1')
54         cls.block_cache_test = DiskCacheBase()
55         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
56                                              proxy='', local_store='',
57                                              block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
58
59     @classmethod
60     def tearDownClass(cls):
61         super(KeepTestCase, cls).setUpClass()
62         cls.block_cache_test.tearDown()
63
64     def test_KeepBasicRWTest(self):
65         self.assertEqual(0, self.keep_client.upload_counter.get())
66         foo_locator = self.keep_client.put('foo')
67         self.assertRegex(
68             foo_locator,
69             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
70             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
71
72         # 6 bytes because uploaded 2 copies
73         self.assertEqual(6, self.keep_client.upload_counter.get())
74
75         self.assertEqual(0, self.keep_client.download_counter.get())
76         self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
77                          b'foo'),
78                          'wrong content from Keep.get(md5("foo"))')
79         self.assertEqual(3, self.keep_client.download_counter.get())
80
81     def test_KeepBinaryRWTest(self):
82         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
83         blob_locator = self.keep_client.put(blob_str)
84         self.assertRegex(
85             blob_locator,
86             '^7fc7c53b45e53926ba52821140fef396\+6',
87             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
88         self.assertEqual(self.keep_client.get(blob_locator),
89                          blob_str,
90                          'wrong content from Keep.get(md5(<binarydata>))')
91
92     def test_KeepLongBinaryRWTest(self):
93         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
94         for i in range(0, 23):
95             blob_data = blob_data + blob_data
96         blob_locator = self.keep_client.put(blob_data)
97         self.assertRegex(
98             blob_locator,
99             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
100             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
101         self.assertEqual(self.keep_client.get(blob_locator),
102                          blob_data,
103                          'wrong content from Keep.get(md5(<binarydata>))')
104
105     @unittest.skip("unreliable test - please fix and close #8752")
106     def test_KeepSingleCopyRWTest(self):
107         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
108         blob_locator = self.keep_client.put(blob_data, copies=1)
109         self.assertRegex(
110             blob_locator,
111             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
112             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
113         self.assertEqual(self.keep_client.get(blob_locator),
114                          blob_data,
115                          'wrong content from Keep.get(md5(<binarydata>))')
116
117     def test_KeepEmptyCollectionTest(self):
118         blob_locator = self.keep_client.put('', copies=1)
119         self.assertRegex(
120             blob_locator,
121             '^d41d8cd98f00b204e9800998ecf8427e\+0',
122             ('wrong locator from Keep.put(""): ' + blob_locator))
123
124     def test_unicode_must_be_ascii(self):
125         # If unicode type, must only consist of valid ASCII
126         foo_locator = self.keep_client.put(u'foo')
127         self.assertRegex(
128             foo_locator,
129             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
130             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
131
132         if sys.version_info < (3, 0):
133             with self.assertRaises(UnicodeEncodeError):
134                 # Error if it is not ASCII
135                 self.keep_client.put(u'\xe2')
136
137         with self.assertRaises(AttributeError):
138             # Must be bytes or have an encode() method
139             self.keep_client.put({})
140
141     def test_KeepHeadTest(self):
142         locator = self.keep_client.put('test_head')
143         self.assertRegex(
144             locator,
145             '^b9a772c7049325feb7130fff1f8333e9\+9',
146             'wrong md5 hash from Keep.put for "test_head": ' + locator)
147         self.assertEqual(True, self.keep_client.head(locator))
148         self.assertEqual(self.keep_client.get(locator),
149                          b'test_head',
150                          'wrong content from Keep.get for "test_head"')
151
152 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
153 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
154     disk_cache = False
155     MAIN_SERVER = {}
156     KEEP_SERVER = {'blob_signing': True}
157
158     def tearDown(self):
159         DiskCacheBase.tearDown(self)
160
161     def test_KeepBasicRWTest(self):
162         run_test_server.authorize_with('active')
163         keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
164         foo_locator = keep_client.put('foo')
165         self.assertRegex(
166             foo_locator,
167             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
168             'invalid locator from Keep.put("foo"): ' + foo_locator)
169         self.assertEqual(keep_client.get(foo_locator),
170                          b'foo',
171                          'wrong content from Keep.get(md5("foo"))')
172
173         # GET with an unsigned locator => bad request
174         bar_locator = keep_client.put('bar')
175         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
176         self.assertRegex(
177             bar_locator,
178             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
179             'invalid locator from Keep.put("bar"): ' + bar_locator)
180         self.assertRaises(arvados.errors.KeepReadError,
181                           keep_client.get,
182                           unsigned_bar_locator)
183
184         # GET from a different user => bad request
185         run_test_server.authorize_with('spectator')
186         self.assertRaises(arvados.errors.KeepReadError,
187                           arvados.Keep.get,
188                           bar_locator)
189
190         # Unauthenticated GET for a signed locator => bad request
191         # Unauthenticated GET for an unsigned locator => bad request
192         keep_client.api_token = ''
193         self.assertRaises(arvados.errors.KeepReadError,
194                           keep_client.get,
195                           bar_locator)
196         self.assertRaises(arvados.errors.KeepReadError,
197                           keep_client.get,
198                           unsigned_bar_locator)
199
200 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
201 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
202     disk_cache = False
203     MAIN_SERVER = {}
204     KEEP_SERVER = {}
205     KEEP_PROXY_SERVER = {}
206
207     @classmethod
208     def setUpClass(cls):
209         super(KeepProxyTestCase, cls).setUpClass()
210         run_test_server.authorize_with('active')
211         cls.api_client = arvados.api('v1')
212
213     def tearDown(self):
214         super(KeepProxyTestCase, self).tearDown()
215         DiskCacheBase.tearDown(self)
216
217     def test_KeepProxyTest1(self):
218         # Will use ARVADOS_KEEP_SERVICES environment variable that
219         # is set by setUpClass().
220         keep_client = arvados.KeepClient(api_client=self.api_client,
221                                          local_store='', block_cache=self.make_block_cache(self.disk_cache))
222         baz_locator = keep_client.put('baz')
223         self.assertRegex(
224             baz_locator,
225             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
226             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
227         self.assertEqual(keep_client.get(baz_locator),
228                          b'baz',
229                          'wrong content from Keep.get(md5("baz"))')
230         self.assertTrue(keep_client.using_proxy)
231
232     def test_KeepProxyTestMultipleURIs(self):
233         # Test using ARVADOS_KEEP_SERVICES env var overriding any
234         # existing proxy setting and setting multiple proxies
235         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
236         keep_client = arvados.KeepClient(api_client=self.api_client,
237                                          local_store='',
238                                          block_cache=self.make_block_cache(self.disk_cache))
239         uris = [x['_service_root'] for x in keep_client._keep_services]
240         self.assertEqual(uris, ['http://10.0.0.1/',
241                                 'https://foo.example.org:1234/'])
242
243     def test_KeepProxyTestInvalidURI(self):
244         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
245         with self.assertRaises(arvados.errors.ArgumentError):
246             keep_client = arvados.KeepClient(api_client=self.api_client,
247                                              local_store='',
248                                              block_cache=self.make_block_cache(self.disk_cache))
249
250 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
251 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
252     disk_cache = False
253
254     def tearDown(self):
255         DiskCacheBase.tearDown(self)
256
257     def get_service_roots(self, api_client):
258         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
259         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
260         return [urllib.parse.urlparse(url) for url in sorted(services)]
261
262     def test_ssl_flag_respected_in_roots(self):
263         for ssl_flag in [False, True]:
264             services = self.get_service_roots(self.mock_keep_services(
265                 service_ssl_flag=ssl_flag))
266             self.assertEqual(
267                 ('https' if ssl_flag else 'http'), services[0].scheme)
268
269     def test_correct_ports_with_ipv6_addresses(self):
270         service = self.get_service_roots(self.mock_keep_services(
271             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
272         self.assertEqual('100::1', service.hostname)
273         self.assertEqual(10, service.port)
274
275     def test_recognize_proxy_services_in_controller_response(self):
276         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
277             service_type='proxy', service_host='localhost', service_port=9, count=1),
278                                          block_cache=self.make_block_cache(self.disk_cache))
279         try:
280             # this will fail, but it ensures we get the service
281             # discovery response
282             keep_client.put('baz2', num_retries=0)
283         except:
284             pass
285         self.assertTrue(keep_client.using_proxy)
286
287     def test_insecure_disables_tls_verify(self):
288         api_client = self.mock_keep_services(count=1)
289         force_timeout = socket.timeout("timed out")
290
291         api_client.insecure = True
292         with tutil.mock_keep_responses(b'foo', 200) as mock:
293             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
294             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
295             self.assertEqual(
296                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
297                 0)
298             self.assertEqual(
299                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
300                 0)
301
302         api_client.insecure = False
303         with tutil.mock_keep_responses(b'foo', 200) as mock:
304             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
305             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
306             # getopt()==None here means we didn't change the
307             # default. If we were using real pycurl instead of a mock,
308             # it would return the default value 1.
309             self.assertEqual(
310                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
311                 None)
312             self.assertEqual(
313                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
314                 None)
315
316     def test_refresh_signature(self):
317         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
318         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
319         local_loc = blk_digest+'+A'+blk_sig
320         remote_loc = blk_digest+'+R'+blk_sig
321         api_client = self.mock_keep_services(count=1)
322         headers = {'X-Keep-Locator':local_loc}
323         with tutil.mock_keep_responses('', 200, **headers):
324             # Check that the translated locator gets returned
325             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
326             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
327             # Check that refresh_signature() uses the correct method and headers
328             keep_client._get_or_head = mock.MagicMock()
329             keep_client.refresh_signature(remote_loc)
330             args, kwargs = keep_client._get_or_head.call_args_list[0]
331             self.assertIn(remote_loc, args)
332             self.assertEqual("HEAD", kwargs['method'])
333             self.assertIn('X-Keep-Signature', kwargs['headers'])
334
335     # test_*_timeout verify that KeepClient instructs pycurl to use
336     # the appropriate connection and read timeouts. They don't care
337     # whether pycurl actually exhibits the expected timeout behavior
338     # -- those tests are in the KeepClientTimeout test class.
339
340     def test_get_timeout(self):
341         api_client = self.mock_keep_services(count=1)
342         force_timeout = socket.timeout("timed out")
343         with tutil.mock_keep_responses(force_timeout, 0) as mock:
344             keep_client = arvados.KeepClient(
345                 api_client=api_client,
346                 block_cache=self.make_block_cache(self.disk_cache),
347                 num_retries=0,
348             )
349             with self.assertRaises(arvados.errors.KeepReadError):
350                 keep_client.get('ffffffffffffffffffffffffffffffff')
351             self.assertEqual(
352                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
353                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
354             self.assertEqual(
355                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
356                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
357             self.assertEqual(
358                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
359                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
360
361     def test_put_timeout(self):
362         api_client = self.mock_keep_services(count=1)
363         force_timeout = socket.timeout("timed out")
364         with tutil.mock_keep_responses(force_timeout, 0) as mock:
365             keep_client = arvados.KeepClient(
366                 api_client=api_client,
367                 block_cache=self.make_block_cache(self.disk_cache),
368                 num_retries=0,
369             )
370             with self.assertRaises(arvados.errors.KeepWriteError):
371                 keep_client.put(b'foo')
372             self.assertEqual(
373                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
374                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
375             self.assertEqual(
376                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
377                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
378             self.assertEqual(
379                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
380                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
381
382     def test_head_timeout(self):
383         api_client = self.mock_keep_services(count=1)
384         force_timeout = socket.timeout("timed out")
385         with tutil.mock_keep_responses(force_timeout, 0) as mock:
386             keep_client = arvados.KeepClient(
387                 api_client=api_client,
388                 block_cache=self.make_block_cache(self.disk_cache),
389                 num_retries=0,
390             )
391             with self.assertRaises(arvados.errors.KeepReadError):
392                 keep_client.head('ffffffffffffffffffffffffffffffff')
393             self.assertEqual(
394                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
395                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
396             self.assertEqual(
397                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
398                 None)
399             self.assertEqual(
400                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
401                 None)
402
403     def test_proxy_get_timeout(self):
404         api_client = self.mock_keep_services(service_type='proxy', count=1)
405         force_timeout = socket.timeout("timed out")
406         with tutil.mock_keep_responses(force_timeout, 0) as mock:
407             keep_client = arvados.KeepClient(
408                 api_client=api_client,
409                 block_cache=self.make_block_cache(self.disk_cache),
410                 num_retries=0,
411             )
412             with self.assertRaises(arvados.errors.KeepReadError):
413                 keep_client.get('ffffffffffffffffffffffffffffffff')
414             self.assertEqual(
415                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
416                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
417             self.assertEqual(
418                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
419                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
420             self.assertEqual(
421                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
422                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
423
424     def test_proxy_head_timeout(self):
425         api_client = self.mock_keep_services(service_type='proxy', count=1)
426         force_timeout = socket.timeout("timed out")
427         with tutil.mock_keep_responses(force_timeout, 0) as mock:
428             keep_client = arvados.KeepClient(
429                 api_client=api_client,
430                 block_cache=self.make_block_cache(self.disk_cache),
431                 num_retries=0,
432             )
433             with self.assertRaises(arvados.errors.KeepReadError):
434                 keep_client.head('ffffffffffffffffffffffffffffffff')
435             self.assertEqual(
436                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
437                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
438             self.assertEqual(
439                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
440                 None)
441             self.assertEqual(
442                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
443                 None)
444
445     def test_proxy_put_timeout(self):
446         self.disk_cache_dir = None
447         api_client = self.mock_keep_services(service_type='proxy', count=1)
448         force_timeout = socket.timeout("timed out")
449         with tutil.mock_keep_responses(force_timeout, 0) as mock:
450             keep_client = arvados.KeepClient(
451                 api_client=api_client,
452                 num_retries=0,
453             )
454             with self.assertRaises(arvados.errors.KeepWriteError):
455                 keep_client.put('foo')
456             self.assertEqual(
457                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
458                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
459             self.assertEqual(
460                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
461                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
462             self.assertEqual(
463                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
464                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
465
466     def check_no_services_error(self, verb, exc_class):
467         api_client = mock.MagicMock(name='api_client')
468         api_client.keep_services().accessible().execute.side_effect = (
469             arvados.errors.ApiError)
470         keep_client = arvados.KeepClient(
471             api_client=api_client,
472             block_cache=self.make_block_cache(self.disk_cache),
473             num_retries=0,
474         )
475         with self.assertRaises(exc_class) as err_check:
476             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
477         self.assertEqual(0, len(err_check.exception.request_errors()))
478
479     def test_get_error_with_no_services(self):
480         self.check_no_services_error('get', arvados.errors.KeepReadError)
481
482     def test_head_error_with_no_services(self):
483         self.check_no_services_error('head', arvados.errors.KeepReadError)
484
485     def test_put_error_with_no_services(self):
486         self.check_no_services_error('put', arvados.errors.KeepWriteError)
487
488     def check_errors_from_last_retry(self, verb, exc_class):
489         api_client = self.mock_keep_services(count=2)
490         req_mock = tutil.mock_keep_responses(
491             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
492         with req_mock, tutil.skip_sleep, \
493                 self.assertRaises(exc_class) as err_check:
494             keep_client = arvados.KeepClient(
495                 api_client=api_client,
496                 block_cache=self.make_block_cache(self.disk_cache),
497                 num_retries=0,
498             )
499             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
500                                        num_retries=3)
501         self.assertEqual([502, 502], [
502                 getattr(error, 'status_code', None)
503                 for error in err_check.exception.request_errors().values()])
504         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
505
506     def test_get_error_reflects_last_retry(self):
507         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
508
509     def test_head_error_reflects_last_retry(self):
510         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
511
512     def test_put_error_reflects_last_retry(self):
513         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
514
515     def test_put_error_does_not_include_successful_puts(self):
516         data = 'partial failure test'
517         data_loc = tutil.str_keep_locator(data)
518         api_client = self.mock_keep_services(count=3)
519         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
520                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
521             keep_client = arvados.KeepClient(
522                 api_client=api_client,
523                 block_cache=self.make_block_cache(self.disk_cache),
524                 num_retries=0,
525             )
526             keep_client.put(data)
527         self.assertEqual(2, len(exc_check.exception.request_errors()))
528
529     def test_proxy_put_with_no_writable_services(self):
530         data = 'test with no writable services'
531         data_loc = tutil.str_keep_locator(data)
532         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
533         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
534                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
535             keep_client = arvados.KeepClient(
536                 api_client=api_client,
537                 block_cache=self.make_block_cache(self.disk_cache),
538                 num_retries=0,
539             )
540             keep_client.put(data)
541         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
542         self.assertEqual(0, len(exc_check.exception.request_errors()))
543
544     def test_oddball_service_get(self):
545         body = b'oddball service get'
546         api_client = self.mock_keep_services(service_type='fancynewblobstore')
547         with tutil.mock_keep_responses(body, 200):
548             keep_client = arvados.KeepClient(
549                 api_client=api_client,
550                 block_cache=self.make_block_cache(self.disk_cache),
551                 num_retries=0,
552             )
553             actual = keep_client.get(tutil.str_keep_locator(body))
554         self.assertEqual(body, actual)
555
556     def test_oddball_service_put(self):
557         body = b'oddball service put'
558         pdh = tutil.str_keep_locator(body)
559         api_client = self.mock_keep_services(service_type='fancynewblobstore')
560         with tutil.mock_keep_responses(pdh, 200):
561             keep_client = arvados.KeepClient(
562                 api_client=api_client,
563                 block_cache=self.make_block_cache(self.disk_cache),
564                 num_retries=0,
565             )
566             actual = keep_client.put(body, copies=1)
567         self.assertEqual(pdh, actual)
568
569     def test_oddball_service_writer_count(self):
570         body = b'oddball service writer count'
571         pdh = tutil.str_keep_locator(body)
572         api_client = self.mock_keep_services(service_type='fancynewblobstore',
573                                              count=4)
574         headers = {'x-keep-replicas-stored': 3}
575         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
576                                        **headers) as req_mock:
577             keep_client = arvados.KeepClient(
578                 api_client=api_client,
579                 block_cache=self.make_block_cache(self.disk_cache),
580                 num_retries=0,
581             )
582             actual = keep_client.put(body, copies=2)
583         self.assertEqual(pdh, actual)
584         self.assertEqual(1, req_mock.call_count)
585
586 @tutil.skip_sleep
587 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
588 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
589     disk_cache = False
590
591     def setUp(self):
592         self.api_client = self.mock_keep_services(count=2)
593         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
594         self.data = b'xyzzy'
595         self.locator = '1271ed5ef305aadabc605b1609e24c52'
596
597     def tearDown(self):
598         DiskCacheBase.tearDown(self)
599
600     @mock.patch('arvados.KeepClient.KeepService.get')
601     def test_get_request_cache(self, get_mock):
602         with tutil.mock_keep_responses(self.data, 200, 200):
603             self.keep_client.get(self.locator)
604             self.keep_client.get(self.locator)
605         # Request already cached, don't require more than one request
606         get_mock.assert_called_once()
607
608     @mock.patch('arvados.KeepClient.KeepService.get')
609     def test_head_request_cache(self, get_mock):
610         with tutil.mock_keep_responses(self.data, 200, 200):
611             self.keep_client.head(self.locator)
612             self.keep_client.head(self.locator)
613         # Don't cache HEAD requests so that they're not confused with GET reqs
614         self.assertEqual(2, get_mock.call_count)
615
616     @mock.patch('arvados.KeepClient.KeepService.get')
617     def test_head_and_then_get_return_different_responses(self, get_mock):
618         head_resp = None
619         get_resp = None
620         get_mock.side_effect = [b'first response', b'second response']
621         with tutil.mock_keep_responses(self.data, 200, 200):
622             head_resp = self.keep_client.head(self.locator)
623             get_resp = self.keep_client.get(self.locator)
624         self.assertEqual(b'first response', head_resp)
625         # First reponse was not cached because it was from a HEAD request.
626         self.assertNotEqual(head_resp, get_resp)
627
628
629
630
631
632 @tutil.skip_sleep
633 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
634 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
635     disk_cache = False
636
637     def setUp(self):
638         self.api_client = self.mock_keep_services(count=2)
639         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
640         self.data = b'xyzzy'
641         self.locator = '1271ed5ef305aadabc605b1609e24c52'
642         self.test_id = arvados.util.new_request_id()
643         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
644         # If we don't set request_id to None explicitly here, it will
645         # return <MagicMock name='api_client_mock.request_id'
646         # id='123456789'>:
647         self.api_client.request_id = None
648
649     def tearDown(self):
650         DiskCacheBase.tearDown(self)
651
652     def test_default_to_api_client_request_id(self):
653         self.api_client.request_id = self.test_id
654         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
655             self.keep_client.put(self.data)
656         self.assertEqual(2, len(mock.responses))
657         for resp in mock.responses:
658             self.assertProvidedRequestId(resp)
659
660         with tutil.mock_keep_responses(self.data, 200) as mock:
661             self.keep_client.get(self.locator)
662         self.assertProvidedRequestId(mock.responses[0])
663
664         with tutil.mock_keep_responses(b'', 200) as mock:
665             self.keep_client.head(self.locator)
666         self.assertProvidedRequestId(mock.responses[0])
667
668     def test_explicit_request_id(self):
669         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
670             self.keep_client.put(self.data, request_id=self.test_id)
671         self.assertEqual(2, len(mock.responses))
672         for resp in mock.responses:
673             self.assertProvidedRequestId(resp)
674
675         with tutil.mock_keep_responses(self.data, 200) as mock:
676             self.keep_client.get(self.locator, request_id=self.test_id)
677         self.assertProvidedRequestId(mock.responses[0])
678
679         with tutil.mock_keep_responses(b'', 200) as mock:
680             self.keep_client.head(self.locator, request_id=self.test_id)
681         self.assertProvidedRequestId(mock.responses[0])
682
683     def test_automatic_request_id(self):
684         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
685             self.keep_client.put(self.data)
686         self.assertEqual(2, len(mock.responses))
687         for resp in mock.responses:
688             self.assertAutomaticRequestId(resp)
689
690         with tutil.mock_keep_responses(self.data, 200) as mock:
691             self.keep_client.get(self.locator)
692         self.assertAutomaticRequestId(mock.responses[0])
693
694         with tutil.mock_keep_responses(b'', 200) as mock:
695             self.keep_client.head(self.locator)
696         self.assertAutomaticRequestId(mock.responses[0])
697
698     def test_request_id_in_exception(self):
699         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
700             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
701                 self.keep_client.head(self.locator, request_id=self.test_id)
702
703         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
704             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
705                 self.keep_client.get(self.locator)
706
707         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
708             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
709                 self.keep_client.put(self.data, request_id=self.test_id)
710
711         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
712             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
713                 self.keep_client.put(self.data)
714
715     def assertAutomaticRequestId(self, resp):
716         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
717                if x.startswith('X-Request-Id: ')][0]
718         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
719         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
720
721     def assertProvidedRequestId(self, resp):
722         self.assertIn('X-Request-Id: '+self.test_id,
723                       resp.getopt(pycurl.HTTPHEADER))
724
725
726 @tutil.skip_sleep
727 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
728 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
729     disk_cache = False
730
731     def setUp(self):
732         # expected_order[i] is the probe order for
733         # hash=md5(sprintf("%064x",i)) where there are 16 services
734         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
735         # the first probe for the block consisting of 64 "0"
736         # characters is the service whose uuid is
737         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
738         self.services = 16
739         self.expected_order = [
740             list('3eab2d5fc9681074'),
741             list('097dba52e648f1c3'),
742             list('c5b4e023f8a7d691'),
743             list('9d81c02e76a3bf54'),
744             ]
745         self.blocks = [
746             "{:064x}".format(x).encode()
747             for x in range(len(self.expected_order))]
748         self.hashes = [
749             hashlib.md5(self.blocks[x]).hexdigest()
750             for x in range(len(self.expected_order))]
751         self.api_client = self.mock_keep_services(count=self.services)
752         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
753
754     def tearDown(self):
755         DiskCacheBase.tearDown(self)
756
757     def test_weighted_service_roots_against_reference_set(self):
758         # Confirm weighted_service_roots() returns the correct order
759         for i, hash in enumerate(self.hashes):
760             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
761             got_order = [
762                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
763                 for root in roots]
764             self.assertEqual(self.expected_order[i], got_order)
765
766     def test_get_probe_order_against_reference_set(self):
767         self._test_probe_order_against_reference_set(
768             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
769
770     def test_head_probe_order_against_reference_set(self):
771         self._test_probe_order_against_reference_set(
772             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
773
774     def test_put_probe_order_against_reference_set(self):
775         # copies=1 prevents the test from being sensitive to races
776         # between writer threads.
777         self._test_probe_order_against_reference_set(
778             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
779
780     def _test_probe_order_against_reference_set(self, op):
781         for i in range(len(self.blocks)):
782             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
783                  self.assertRaises(arvados.errors.KeepRequestError):
784                 op(i)
785             got_order = [
786                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
787                 for resp in mock.responses]
788             self.assertEqual(self.expected_order[i]*2, got_order)
789
790     def test_put_probe_order_multiple_copies(self):
791         for copies in range(2, 4):
792             for i in range(len(self.blocks)):
793                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
794                      self.assertRaises(arvados.errors.KeepWriteError):
795                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
796                 got_order = [
797                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
798                     for resp in mock.responses]
799                 # With T threads racing to make requests, the position
800                 # of a given server in the sequence of HTTP requests
801                 # (got_order) cannot be more than T-1 positions
802                 # earlier than that server's position in the reference
803                 # probe sequence (expected_order).
804                 #
805                 # Loop invariant: we have accounted for +pos+ expected
806                 # probes, either by seeing them in +got_order+ or by
807                 # putting them in +pending+ in the hope of seeing them
808                 # later. As long as +len(pending)<T+, we haven't
809                 # started a request too early.
810                 pending = []
811                 for pos, expected in enumerate(self.expected_order[i]*3):
812                     got = got_order[pos-len(pending)]
813                     while got in pending:
814                         del pending[pending.index(got)]
815                         got = got_order[pos-len(pending)]
816                     if got != expected:
817                         pending.append(expected)
818                         self.assertLess(
819                             len(pending), copies,
820                             "pending={}, with copies={}, got {}, expected {}".format(
821                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
822
823     def test_probe_waste_adding_one_server(self):
824         hashes = [
825             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
826         initial_services = 12
827         self.api_client = self.mock_keep_services(count=initial_services)
828         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
829         probes_before = [
830             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
831         for added_services in range(1, 12):
832             api_client = self.mock_keep_services(count=initial_services+added_services)
833             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
834             total_penalty = 0
835             for hash_index in range(len(hashes)):
836                 probe_after = keep_client.weighted_service_roots(
837                     arvados.KeepLocator(hashes[hash_index]))
838                 penalty = probe_after.index(probes_before[hash_index][0])
839                 self.assertLessEqual(penalty, added_services)
840                 total_penalty += penalty
841             # Average penalty per block should not exceed
842             # N(added)/N(orig) by more than 20%, and should get closer
843             # to the ideal as we add data points.
844             expect_penalty = (
845                 added_services *
846                 len(hashes) / initial_services)
847             max_penalty = (
848                 expect_penalty *
849                 (120 - added_services)/100)
850             min_penalty = (
851                 expect_penalty * 8/10)
852             self.assertTrue(
853                 min_penalty <= total_penalty <= max_penalty,
854                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
855                     initial_services,
856                     added_services,
857                     len(hashes),
858                     total_penalty,
859                     min_penalty,
860                     max_penalty))
861
862     def check_64_zeros_error_order(self, verb, exc_class):
863         data = b'0' * 64
864         if verb == 'get':
865             data = tutil.str_keep_locator(data)
866         # Arbitrary port number:
867         aport = random.randint(1024,65535)
868         api_client = self.mock_keep_services(service_port=aport, count=self.services)
869         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
870         with mock.patch('pycurl.Curl') as curl_mock, \
871              self.assertRaises(exc_class) as err_check:
872             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
873             getattr(keep_client, verb)(data)
874         urls = [urllib.parse.urlparse(url)
875                 for url in err_check.exception.request_errors()]
876         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
877                          [(url.hostname, url.port) for url in urls])
878
879     def test_get_error_shows_probe_order(self):
880         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
881
882     def test_put_error_shows_probe_order(self):
883         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
884
885 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
886 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
887     disk_cache = False
888
889     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
890     # 1s worth of data and then trigger bandwidth errors before running
891     # out of data.
892     DATA = b'x'*2**11
893     BANDWIDTH_LOW_LIM = 1024
894     TIMEOUT_TIME = 1.0
895
896     def tearDown(self):
897         DiskCacheBase.tearDown(self)
898
899     class assertTakesBetween(unittest.TestCase):
900         def __init__(self, tmin, tmax):
901             self.tmin = tmin
902             self.tmax = tmax
903
904         def __enter__(self):
905             self.t0 = time.time()
906
907         def __exit__(self, *args, **kwargs):
908             # Round times to milliseconds, like CURL. Otherwise, we
909             # fail when CURL reaches a 1s timeout at 0.9998s.
910             delta = round(time.time() - self.t0, 3)
911             self.assertGreaterEqual(delta, self.tmin)
912             self.assertLessEqual(delta, self.tmax)
913
914     class assertTakesGreater(unittest.TestCase):
915         def __init__(self, tmin):
916             self.tmin = tmin
917
918         def __enter__(self):
919             self.t0 = time.time()
920
921         def __exit__(self, *args, **kwargs):
922             delta = round(time.time() - self.t0, 3)
923             self.assertGreaterEqual(delta, self.tmin)
924
925     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
926         return arvados.KeepClient(
927             api_client=self.api_client,
928             timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
929
930     def test_timeout_slow_connect(self):
931         # Can't simulate TCP delays with our own socket. Leave our
932         # stub server running uselessly, and try to connect to an
933         # unroutable IP address instead.
934         self.api_client = self.mock_keep_services(
935             count=1,
936             service_host='240.0.0.0',
937         )
938         with self.assertTakesBetween(0.1, 0.5):
939             with self.assertRaises(arvados.errors.KeepWriteError):
940                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
941
942     def test_low_bandwidth_no_delays_success(self):
943         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
944         kc = self.keepClient()
945         loc = kc.put(self.DATA, copies=1, num_retries=0)
946         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
947
948     def test_too_low_bandwidth_no_delays_failure(self):
949         # Check that lessening bandwidth corresponds to failing
950         kc = self.keepClient()
951         loc = kc.put(self.DATA, copies=1, num_retries=0)
952         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
953         with self.assertTakesGreater(self.TIMEOUT_TIME):
954             with self.assertRaises(arvados.errors.KeepReadError):
955                 kc.get(loc, num_retries=0)
956         with self.assertTakesGreater(self.TIMEOUT_TIME):
957             with self.assertRaises(arvados.errors.KeepWriteError):
958                 kc.put(self.DATA, copies=1, num_retries=0)
959
960     def test_low_bandwidth_with_server_response_delay_failure(self):
961         kc = self.keepClient()
962         loc = kc.put(self.DATA, copies=1, num_retries=0)
963         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
964         # Note the actual delay must be 1s longer than the low speed
965         # limit interval in order for curl to detect it reliably.
966         self.server.setdelays(response=self.TIMEOUT_TIME+1)
967         with self.assertTakesGreater(self.TIMEOUT_TIME):
968             with self.assertRaises(arvados.errors.KeepReadError):
969                 kc.get(loc, num_retries=0)
970         with self.assertTakesGreater(self.TIMEOUT_TIME):
971             with self.assertRaises(arvados.errors.KeepWriteError):
972                 kc.put(self.DATA, copies=1, num_retries=0)
973         with self.assertTakesGreater(self.TIMEOUT_TIME):
974             kc.head(loc, num_retries=0)
975
976     def test_low_bandwidth_with_server_mid_delay_failure(self):
977         kc = self.keepClient()
978         loc = kc.put(self.DATA, copies=1, num_retries=0)
979         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
980         # Note the actual delay must be 1s longer than the low speed
981         # limit interval in order for curl to detect it reliably.
982         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
983         with self.assertTakesGreater(self.TIMEOUT_TIME):
984             with self.assertRaises(arvados.errors.KeepReadError) as e:
985                 kc.get(loc, num_retries=0)
986         with self.assertTakesGreater(self.TIMEOUT_TIME):
987             with self.assertRaises(arvados.errors.KeepWriteError):
988                 kc.put(self.DATA, copies=1, num_retries=0)
989
990     def test_timeout_slow_request(self):
991         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
992         self.server.setdelays(request=.2)
993         self._test_connect_timeout_under_200ms(loc)
994         self.server.setdelays(request=2)
995         self._test_response_timeout_under_2s(loc)
996
997     def test_timeout_slow_response(self):
998         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
999         self.server.setdelays(response=.2)
1000         self._test_connect_timeout_under_200ms(loc)
1001         self.server.setdelays(response=2)
1002         self._test_response_timeout_under_2s(loc)
1003
1004     def test_timeout_slow_response_body(self):
1005         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1006         self.server.setdelays(response_body=.2)
1007         self._test_connect_timeout_under_200ms(loc)
1008         self.server.setdelays(response_body=2)
1009         self._test_response_timeout_under_2s(loc)
1010
1011     def _test_connect_timeout_under_200ms(self, loc):
1012         # Allow 100ms to connect, then 1s for response. Everything
1013         # should work, and everything should take at least 200ms to
1014         # return.
1015         kc = self.keepClient(timeouts=(.1, 1))
1016         with self.assertTakesBetween(.2, .3):
1017             kc.put(self.DATA, copies=1, num_retries=0)
1018         with self.assertTakesBetween(.2, .3):
1019             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1020
1021     def _test_response_timeout_under_2s(self, loc):
1022         # Allow 10s to connect, then 1s for response. Nothing should
1023         # work, and everything should take at least 1s to return.
1024         kc = self.keepClient(timeouts=(10, 1))
1025         with self.assertTakesBetween(1, 9):
1026             with self.assertRaises(arvados.errors.KeepReadError):
1027                 kc.get(loc, num_retries=0)
1028         with self.assertTakesBetween(1, 9):
1029             with self.assertRaises(arvados.errors.KeepWriteError):
1030                 kc.put(self.DATA, copies=1, num_retries=0)
1031
1032 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1033 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1034     disk_cache = False
1035
1036     def tearDown(self):
1037         DiskCacheBase.tearDown(self)
1038
1039     def mock_disks_and_gateways(self, disks=3, gateways=1):
1040         self.gateways = [{
1041                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1042                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1043                 'service_host': 'gatewayhost{}'.format(i),
1044                 'service_port': 12345,
1045                 'service_ssl_flag': True,
1046                 'service_type': 'gateway:test',
1047         } for i in range(gateways)]
1048         self.gateway_roots = [
1049             "https://{service_host}:{service_port}/".format(**gw)
1050             for gw in self.gateways]
1051         self.api_client = self.mock_keep_services(
1052             count=disks, additional_services=self.gateways)
1053         self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1054
1055     @mock.patch('pycurl.Curl')
1056     def test_get_with_gateway_hint_first(self, MockCurl):
1057         MockCurl.return_value = tutil.FakeCurl.make(
1058             code=200, body='foo', headers={'Content-Length': 3})
1059         self.mock_disks_and_gateways()
1060         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1061         self.assertEqual(b'foo', self.keepClient.get(locator))
1062         self.assertEqual(self.gateway_roots[0]+locator,
1063                          MockCurl.return_value.getopt(pycurl.URL).decode())
1064         self.assertEqual(True, self.keepClient.head(locator))
1065
1066     @mock.patch('pycurl.Curl')
1067     def test_get_with_gateway_hints_in_order(self, MockCurl):
1068         gateways = 4
1069         disks = 3
1070         mocks = [
1071             tutil.FakeCurl.make(code=404, body='')
1072             for _ in range(gateways+disks)
1073         ]
1074         MockCurl.side_effect = tutil.queue_with(mocks)
1075         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1076         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1077                            ['K@'+gw['uuid'] for gw in self.gateways])
1078         with self.assertRaises(arvados.errors.NotFoundError):
1079             self.keepClient.get(locator)
1080         # Gateways are tried first, in the order given.
1081         for i, root in enumerate(self.gateway_roots):
1082             self.assertEqual(root+locator,
1083                              mocks[i].getopt(pycurl.URL).decode())
1084         # Disk services are tried next.
1085         for i in range(gateways, gateways+disks):
1086             self.assertRegex(
1087                 mocks[i].getopt(pycurl.URL).decode(),
1088                 r'keep0x')
1089
1090     @mock.patch('pycurl.Curl')
1091     def test_head_with_gateway_hints_in_order(self, MockCurl):
1092         gateways = 4
1093         disks = 3
1094         mocks = [
1095             tutil.FakeCurl.make(code=404, body=b'')
1096             for _ in range(gateways+disks)
1097         ]
1098         MockCurl.side_effect = tutil.queue_with(mocks)
1099         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1100         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1101                            ['K@'+gw['uuid'] for gw in self.gateways])
1102         with self.assertRaises(arvados.errors.NotFoundError):
1103             self.keepClient.head(locator)
1104         # Gateways are tried first, in the order given.
1105         for i, root in enumerate(self.gateway_roots):
1106             self.assertEqual(root+locator,
1107                              mocks[i].getopt(pycurl.URL).decode())
1108         # Disk services are tried next.
1109         for i in range(gateways, gateways+disks):
1110             self.assertRegex(
1111                 mocks[i].getopt(pycurl.URL).decode(),
1112                 r'keep0x')
1113
1114     @mock.patch('pycurl.Curl')
1115     def test_get_with_remote_proxy_hint(self, MockCurl):
1116         MockCurl.return_value = tutil.FakeCurl.make(
1117             code=200, body=b'foo', headers={'Content-Length': 3})
1118         self.mock_disks_and_gateways()
1119         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1120         self.assertEqual(b'foo', self.keepClient.get(locator))
1121         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1122                          MockCurl.return_value.getopt(pycurl.URL).decode())
1123
1124     @mock.patch('pycurl.Curl')
1125     def test_head_with_remote_proxy_hint(self, MockCurl):
1126         MockCurl.return_value = tutil.FakeCurl.make(
1127             code=200, body=b'foo', headers={'Content-Length': 3})
1128         self.mock_disks_and_gateways()
1129         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1130         self.assertEqual(True, self.keepClient.head(locator))
1131         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1132                          MockCurl.return_value.getopt(pycurl.URL).decode())
1133
1134 class KeepClientRetryTestMixin(object):
1135     disk_cache = False
1136
1137     # Testing with a local Keep store won't exercise the retry behavior.
1138     # Instead, our strategy is:
1139     # * Create a client with one proxy specified (pointed at a black
1140     #   hole), so there's no need to instantiate an API client, and
1141     #   all HTTP requests come from one place.
1142     # * Mock httplib's request method to provide simulated responses.
1143     # This lets us test the retry logic extensively without relying on any
1144     # supporting servers, and prevents side effects in case something hiccups.
1145     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1146     # run_method().
1147     #
1148     # Test classes must define TEST_PATCHER to a method that mocks
1149     # out appropriate methods in the client.
1150
1151     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1152     TEST_DATA = b'testdata'
1153     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1154
1155     def setUp(self):
1156         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1157
1158     def new_client(self, **caller_kwargs):
1159         kwargs = self.client_kwargs.copy()
1160         kwargs.update(caller_kwargs)
1161         kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1162         return arvados.KeepClient(**kwargs)
1163
1164     def run_method(self, *args, **kwargs):
1165         raise NotImplementedError("test subclasses must define run_method")
1166
1167     def check_success(self, expected=None, *args, **kwargs):
1168         if expected is None:
1169             expected = self.DEFAULT_EXPECT
1170         self.assertEqual(expected, self.run_method(*args, **kwargs))
1171
1172     def check_exception(self, error_class=None, *args, **kwargs):
1173         if error_class is None:
1174             error_class = self.DEFAULT_EXCEPTION
1175         with self.assertRaises(error_class) as err:
1176             self.run_method(*args, **kwargs)
1177         return err
1178
1179     def test_immediate_success(self):
1180         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1181             self.check_success()
1182
1183     def test_retry_then_success(self):
1184         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1185             self.check_success(num_retries=3)
1186
1187     def test_exception_then_success(self):
1188         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1189             self.check_success(num_retries=3)
1190
1191     def test_no_retry_after_permanent_error(self):
1192         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1193             self.check_exception(num_retries=3)
1194
1195     def test_error_after_retries_exhausted(self):
1196         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1197             err = self.check_exception(num_retries=1)
1198         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1199
1200     def test_num_retries_instance_fallback(self):
1201         self.client_kwargs['num_retries'] = 3
1202         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1203             self.check_success()
1204
1205
1206 @tutil.skip_sleep
1207 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1208 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1209     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1210     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1211     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1212     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1213
1214     def tearDown(self):
1215         DiskCacheBase.tearDown(self)
1216
1217     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1218                    *args, **kwargs):
1219         return self.new_client().get(locator, *args, **kwargs)
1220
1221     def test_specific_exception_when_not_found(self):
1222         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1223             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1224
1225     def test_general_exception_with_mixed_errors(self):
1226         # get should raise a NotFoundError if no server returns the block,
1227         # and a high threshold of servers report that it's not found.
1228         # This test rigs up 50/50 disagreement between two servers, and
1229         # checks that it does not become a NotFoundError.
1230         client = self.new_client(num_retries=0)
1231         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1232             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1233                 client.get(self.HINTED_LOCATOR)
1234             self.assertNotIsInstance(
1235                 exc_check.exception, arvados.errors.NotFoundError,
1236                 "mixed errors raised NotFoundError")
1237
1238     def test_hint_server_can_succeed_without_retries(self):
1239         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1240             self.check_success(locator=self.HINTED_LOCATOR)
1241
1242     def test_try_next_server_after_timeout(self):
1243         with tutil.mock_keep_responses(
1244                 (socket.timeout("timed out"), 200),
1245                 (self.DEFAULT_EXPECT, 200)):
1246             self.check_success(locator=self.HINTED_LOCATOR)
1247
1248     def test_retry_data_with_wrong_checksum(self):
1249         with tutil.mock_keep_responses(
1250                 ('baddata', 200),
1251                 (self.DEFAULT_EXPECT, 200)):
1252             self.check_success(locator=self.HINTED_LOCATOR)
1253
1254 @tutil.skip_sleep
1255 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1256 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1257     DEFAULT_EXPECT = True
1258     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1259     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1260     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1261
1262     def tearDown(self):
1263         DiskCacheBase.tearDown(self)
1264
1265     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1266                    *args, **kwargs):
1267         return self.new_client().head(locator, *args, **kwargs)
1268
1269     def test_specific_exception_when_not_found(self):
1270         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1271             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1272
1273     def test_general_exception_with_mixed_errors(self):
1274         # head should raise a NotFoundError if no server returns the block,
1275         # and a high threshold of servers report that it's not found.
1276         # This test rigs up 50/50 disagreement between two servers, and
1277         # checks that it does not become a NotFoundError.
1278         client = self.new_client(num_retries=0)
1279         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1280             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1281                 client.head(self.HINTED_LOCATOR)
1282             self.assertNotIsInstance(
1283                 exc_check.exception, arvados.errors.NotFoundError,
1284                 "mixed errors raised NotFoundError")
1285
1286     def test_hint_server_can_succeed_without_retries(self):
1287         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1288             self.check_success(locator=self.HINTED_LOCATOR)
1289
1290     def test_try_next_server_after_timeout(self):
1291         with tutil.mock_keep_responses(
1292                 (socket.timeout("timed out"), 200),
1293                 (self.DEFAULT_EXPECT, 200)):
1294             self.check_success(locator=self.HINTED_LOCATOR)
1295
1296 @tutil.skip_sleep
1297 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1298 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1299     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1300     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1301     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1302
1303     def tearDown(self):
1304         DiskCacheBase.tearDown(self)
1305
1306     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1307                    copies=1, *args, **kwargs):
1308         return self.new_client().put(data, copies, *args, **kwargs)
1309
1310     def test_do_not_send_multiple_copies_to_same_server(self):
1311         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1312             self.check_exception(copies=2, num_retries=3)
1313
1314
1315 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1316
1317     class FakeKeepService(object):
1318         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1319             self.delay = delay
1320             self.will_succeed = will_succeed
1321             self.will_raise = will_raise
1322             self._result = {}
1323             self._result['headers'] = {}
1324             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1325             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1326             self._result['body'] = 'foobar'
1327
1328         def put(self, data_hash, data, timeout, headers):
1329             time.sleep(self.delay)
1330             if self.will_raise is not None:
1331                 raise self.will_raise
1332             return self.will_succeed
1333
1334         def last_result(self):
1335             if self.will_succeed:
1336                 return self._result
1337             else:
1338                 return {"status_code": 500, "body": "didn't succeed"}
1339
1340         def finished(self):
1341             return False
1342
1343     def setUp(self):
1344         self.copies = 3
1345         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1346             data = 'foo',
1347             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1348             max_service_replicas = self.copies,
1349             copies = self.copies
1350         )
1351
1352     def test_only_write_enough_on_success(self):
1353         for i in range(10):
1354             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1355             self.pool.add_task(ks, None)
1356         self.pool.join()
1357         self.assertEqual(self.pool.done(), (self.copies, []))
1358
1359     def test_only_write_enough_on_partial_success(self):
1360         for i in range(5):
1361             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1362             self.pool.add_task(ks, None)
1363             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1364             self.pool.add_task(ks, None)
1365         self.pool.join()
1366         self.assertEqual(self.pool.done(), (self.copies, []))
1367
1368     def test_only_write_enough_when_some_crash(self):
1369         for i in range(5):
1370             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1371             self.pool.add_task(ks, None)
1372             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1373             self.pool.add_task(ks, None)
1374         self.pool.join()
1375         self.assertEqual(self.pool.done(), (self.copies, []))
1376
1377     def test_fail_when_too_many_crash(self):
1378         for i in range(self.copies+1):
1379             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1380             self.pool.add_task(ks, None)
1381         for i in range(self.copies-1):
1382             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1383             self.pool.add_task(ks, None)
1384         self.pool.join()
1385         self.assertEqual(self.pool.done(), (self.copies-1, []))
1386
1387
1388 @tutil.skip_sleep
1389 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1390 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1391     block_cache = False
1392
1393     # Test put()s that need two distinct servers to succeed, possibly
1394     # requiring multiple passes through the retry loop.
1395
1396     def setUp(self):
1397         self.api_client = self.mock_keep_services(count=2)
1398         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1399
1400     def tearDown(self):
1401         DiskCacheBase.tearDown(self)
1402
1403     def test_success_after_exception(self):
1404         with tutil.mock_keep_responses(
1405                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1406                 Exception('mock err'), 200, 200) as req_mock:
1407             self.keep_client.put('foo', num_retries=1, copies=2)
1408         self.assertEqual(3, req_mock.call_count)
1409
1410     def test_success_after_retryable_error(self):
1411         with tutil.mock_keep_responses(
1412                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1413                 500, 200, 200) as req_mock:
1414             self.keep_client.put('foo', num_retries=1, copies=2)
1415         self.assertEqual(3, req_mock.call_count)
1416
1417     def test_fail_after_final_error(self):
1418         # First retry loop gets a 200 (can't achieve replication by
1419         # storing again on that server) and a 400 (can't retry that
1420         # server at all), so we shouldn't try a third request.
1421         with tutil.mock_keep_responses(
1422                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1423                 200, 400, 200) as req_mock:
1424             with self.assertRaises(arvados.errors.KeepWriteError):
1425                 self.keep_client.put('foo', num_retries=1, copies=2)
1426         self.assertEqual(2, req_mock.call_count)
1427
1428 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1429 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1430     disk_cache = False
1431
1432     def tearDown(self):
1433         DiskCacheBase.tearDown(self)
1434
1435     def test_api_fail(self):
1436         class ApiMock(object):
1437             def __getattr__(self, r):
1438                 if r == "api_token":
1439                     return "abc"
1440                 elif r == "insecure":
1441                     return False
1442                 elif r == "config":
1443                     return lambda: {}
1444                 else:
1445                     raise arvados.errors.KeepReadError()
1446         keep_client = arvados.KeepClient(api_client=ApiMock(),
1447                                          proxy='', local_store='',
1448                                          block_cache=self.make_block_cache(self.disk_cache))
1449
1450         # The bug this is testing for is that if an API (not
1451         # keepstore) exception is thrown as part of a get(), the next
1452         # attempt to get that same block will result in a deadlock.
1453         # This is why there are two get()s in a row.  Unfortunately,
1454         # the failure mode for this test is that the test suite
1455         # deadlocks, there isn't a good way to avoid that without
1456         # adding a special case that has no use except for this test.
1457
1458         with self.assertRaises(arvados.errors.KeepReadError):
1459             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1460         with self.assertRaises(arvados.errors.KeepReadError):
1461             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1462
1463
1464 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1465     def setUp(self):
1466         self.api_client = self.mock_keep_services(count=2)
1467         self.data = b'xyzzy'
1468         self.locator = '1271ed5ef305aadabc605b1609e24c52'
1469         self.disk_cache_dir = tempfile.mkdtemp()
1470
1471     def tearDown(self):
1472         shutil.rmtree(self.disk_cache_dir)
1473
1474
1475     @mock.patch('arvados.KeepClient.KeepService.get')
1476     def test_disk_cache_read(self, get_mock):
1477         # confirm it finds an existing cache block when the cache is
1478         # initialized.
1479
1480         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1481         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1482             f.write(self.data)
1483
1484         # block cache should have found the existing block
1485         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1486                                                   disk_cache_dir=self.disk_cache_dir)
1487         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1488
1489         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1490
1491         get_mock.assert_not_called()
1492
1493
1494     @mock.patch('arvados.KeepClient.KeepService.get')
1495     def test_disk_cache_share(self, get_mock):
1496         # confirm it finds a cache block written after the disk cache
1497         # was initialized.
1498
1499         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1500                                                   disk_cache_dir=self.disk_cache_dir)
1501         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1502
1503         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1504         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1505             f.write(self.data)
1506
1507         # when we try to get the block, it'll check the disk and find it.
1508         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1509
1510         get_mock.assert_not_called()
1511
1512
1513     def test_disk_cache_write(self):
1514         # confirm the cache block was created
1515
1516         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1517                                                   disk_cache_dir=self.disk_cache_dir)
1518         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1519
1520         with tutil.mock_keep_responses(self.data, 200) as mock:
1521             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1522
1523         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1524
1525         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1526             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1527
1528
1529     def test_disk_cache_clean(self):
1530         # confirm that a tmp file in the cache is cleaned up
1531
1532         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1533         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1534             f.write(b"abc1")
1535
1536         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1537             f.write(b"abc2")
1538
1539         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1540             f.write(b"abc3")
1541
1542         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1543         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1544         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1545
1546         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1547                                                   disk_cache_dir=self.disk_cache_dir)
1548
1549         # The tmp still hasn't been deleted because it was created in the last 60 seconds
1550         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1551         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1552         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1553
1554         # Set the mtime to 61s in the past
1555         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1556         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1557         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1558
1559         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1560                                                    disk_cache_dir=self.disk_cache_dir)
1561
1562         # Tmp should be gone but the other ones are safe.
1563         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1564         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1565         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1566
1567
1568     @mock.patch('arvados.KeepClient.KeepService.get')
1569     def test_disk_cache_cap(self, get_mock):
1570         # confirm that the cache is kept to the desired limit
1571
1572         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1573         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1574             f.write(self.data)
1575
1576         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1577         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1578             f.write(b"foo")
1579
1580         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1581         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1582
1583         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1584                                                    disk_cache_dir=self.disk_cache_dir,
1585                                                    max_slots=1)
1586
1587         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1588         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1589
1590
1591     @mock.patch('arvados.KeepClient.KeepService.get')
1592     def test_disk_cache_share(self, get_mock):
1593         # confirm that a second cache doesn't delete files that belong to the first cache.
1594
1595         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1596         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1597             f.write(self.data)
1598
1599         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1600         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1601             f.write(b"foo")
1602
1603         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1604         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1605
1606         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1607                                                    disk_cache_dir=self.disk_cache_dir,
1608                                                    max_slots=2)
1609
1610         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1611         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1612
1613         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1614                                                    disk_cache_dir=self.disk_cache_dir,
1615                                                    max_slots=1)
1616
1617         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1618         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1619
1620
1621
1622     def test_disk_cache_error(self):
1623         os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1624
1625         # Fail during cache initialization.
1626         with self.assertRaises(OSError):
1627             block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1628                                                       disk_cache_dir=self.disk_cache_dir)
1629
1630
1631     def test_disk_cache_write_error(self):
1632         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1633                                                   disk_cache_dir=self.disk_cache_dir)
1634
1635         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1636
1637         # Make the cache dir read-only
1638         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1639         os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1640
1641         # Cache fails
1642         with self.assertRaises(arvados.errors.KeepCacheError):
1643             with tutil.mock_keep_responses(self.data, 200) as mock:
1644                 keep_client.get(self.locator)
1645
1646
1647     def test_disk_cache_retry_write_error(self):
1648         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1649                                                   disk_cache_dir=self.disk_cache_dir)
1650
1651         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1652
1653         called = False
1654         realmmap = mmap.mmap
1655         def sideeffect_mmap(*args, **kwargs):
1656             nonlocal called
1657             if not called:
1658                 called = True
1659                 raise OSError(errno.ENOSPC, "no space")
1660             else:
1661                 return realmmap(*args, **kwargs)
1662
1663         with patch('mmap.mmap') as mockmmap:
1664             mockmmap.side_effect = sideeffect_mmap
1665
1666             cache_max_before = block_cache.cache_max
1667
1668             with tutil.mock_keep_responses(self.data, 200) as mock:
1669                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1670
1671             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1672
1673         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1674             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1675
1676         # shrank the cache in response to ENOSPC
1677         self.assertTrue(cache_max_before > block_cache.cache_max)
1678
1679
1680     def test_disk_cache_retry_write_error2(self):
1681         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1682                                                   disk_cache_dir=self.disk_cache_dir)
1683
1684         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1685
1686         called = False
1687         realmmap = mmap.mmap
1688         def sideeffect_mmap(*args, **kwargs):
1689             nonlocal called
1690             if not called:
1691                 called = True
1692                 raise OSError(errno.ENOMEM, "no memory")
1693             else:
1694                 return realmmap(*args, **kwargs)
1695
1696         with patch('mmap.mmap') as mockmmap:
1697             mockmmap.side_effect = sideeffect_mmap
1698
1699             slots_before = block_cache._max_slots
1700
1701             with tutil.mock_keep_responses(self.data, 200) as mock:
1702                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1703
1704             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1705
1706         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1707             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1708
1709         # shrank the cache in response to ENOMEM
1710         self.assertTrue(slots_before > block_cache._max_slots)