import hashlib
import mock
import os
+import errno
import pycurl
import random
import re
import shutil
import socket
import sys
+import stat
+import tempfile
import time
import unittest
import urllib.parse
try:
# this will fail, but it ensures we get the service
# discovery response
- keep_client.put('baz2')
+ keep_client.put('baz2', num_retries=0)
except:
pass
self.assertTrue(keep_client.using_proxy)
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(
+ api_client=api_client,
+ block_cache=self.make_block_cache(self.disk_cache),
+ num_retries=0,
+ )
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get('ffffffffffffffffffffffffffffffff')
self.assertEqual(
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(
+ api_client=api_client,
+ block_cache=self.make_block_cache(self.disk_cache),
+ num_retries=0,
+ )
with self.assertRaises(arvados.errors.KeepWriteError):
keep_client.put(b'foo')
self.assertEqual(
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(
+ api_client=api_client,
+ block_cache=self.make_block_cache(self.disk_cache),
+ num_retries=0,
+ )
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.head('ffffffffffffffffffffffffffffffff')
self.assertEqual(
api_client = self.mock_keep_services(service_type='proxy', count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(
+ api_client=api_client,
+ block_cache=self.make_block_cache(self.disk_cache),
+ num_retries=0,
+ )
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get('ffffffffffffffffffffffffffffffff')
self.assertEqual(
api_client = self.mock_keep_services(service_type='proxy', count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(
+ api_client=api_client,
+ block_cache=self.make_block_cache(self.disk_cache),
+ num_retries=0,
+ )
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.head('ffffffffffffffffffffffffffffffff')
self.assertEqual(
api_client = self.mock_keep_services(service_type='proxy', count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(
+ api_client=api_client,
+ num_retries=0,
+ )
with self.assertRaises(arvados.errors.KeepWriteError):
keep_client.put('foo')
self.assertEqual(
api_client = mock.MagicMock(name='api_client')
api_client.keep_services().accessible().execute.side_effect = (
arvados.errors.ApiError)
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(
+ api_client=api_client,
+ block_cache=self.make_block_cache(self.disk_cache),
+ num_retries=0,
+ )
with self.assertRaises(exc_class) as err_check:
getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
self.assertEqual(0, len(err_check.exception.request_errors()))
"retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
with req_mock, tutil.skip_sleep, \
self.assertRaises(exc_class) as err_check:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(
+ api_client=api_client,
+ block_cache=self.make_block_cache(self.disk_cache),
+ num_retries=0,
+ )
getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
num_retries=3)
self.assertEqual([502, 502], [
api_client = self.mock_keep_services(count=3)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(
+ api_client=api_client,
+ block_cache=self.make_block_cache(self.disk_cache),
+ num_retries=0,
+ )
keep_client.put(data)
self.assertEqual(2, len(exc_check.exception.request_errors()))
api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
- keep_client.put(data)
+ keep_client = arvados.KeepClient(
+ api_client=api_client,
+ block_cache=self.make_block_cache(self.disk_cache),
+ num_retries=0,
+ )
+ keep_client.put(data)
self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
self.assertEqual(0, len(exc_check.exception.request_errors()))
body = b'oddball service get'
api_client = self.mock_keep_services(service_type='fancynewblobstore')
with tutil.mock_keep_responses(body, 200):
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(
+ api_client=api_client,
+ block_cache=self.make_block_cache(self.disk_cache),
+ num_retries=0,
+ )
actual = keep_client.get(tutil.str_keep_locator(body))
self.assertEqual(body, actual)
pdh = tutil.str_keep_locator(body)
api_client = self.mock_keep_services(service_type='fancynewblobstore')
with tutil.mock_keep_responses(pdh, 200):
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(
+ api_client=api_client,
+ block_cache=self.make_block_cache(self.disk_cache),
+ num_retries=0,
+ )
actual = keep_client.put(body, copies=1)
self.assertEqual(pdh, actual)
headers = {'x-keep-replicas-stored': 3}
with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
**headers) as req_mock:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(
+ api_client=api_client,
+ block_cache=self.make_block_cache(self.disk_cache),
+ num_retries=0,
+ )
actual = keep_client.put(body, copies=2)
self.assertEqual(pdh, actual)
self.assertEqual(1, req_mock.call_count)
# First reponse was not cached because it was from a HEAD request.
self.assertNotEqual(head_resp, get_resp)
+
+
+
@tutil.skip_sleep
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
'x-keep-storage-classes-confirmed': 'foo=1'}
with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
with self.assertRaises(arvados.errors.KeepWriteError):
- self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
+ self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'], num_retries=0)
# 1st request, both classes pending
req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
- self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
+ self.keep_client.put(self.data, copies=w_copies, classes=w_classes, num_retries=0)
@tutil.skip_sleep
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
self.check_success(num_retries=3)
- def test_no_default_retry(self):
- with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
- self.check_exception()
-
def test_no_retry_after_permanent_error(self):
with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
self.check_exception(num_retries=3)
# and a high threshold of servers report that it's not found.
# This test rigs up 50/50 disagreement between two servers, and
# checks that it does not become a NotFoundError.
- client = self.new_client()
+ client = self.new_client(num_retries=0)
with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
client.get(self.HINTED_LOCATOR)
# and a high threshold of servers report that it's not found.
# This test rigs up 50/50 disagreement between two servers, and
# checks that it does not become a NotFoundError.
- client = self.new_client()
+ client = self.new_client(num_retries=0)
with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
client.head(self.HINTED_LOCATOR)
keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
+
+
+class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
+ def setUp(self):
+ self.api_client = self.mock_keep_services(count=2)
+ self.data = b'xyzzy'
+ self.locator = '1271ed5ef305aadabc605b1609e24c52'
+ self.disk_cache_dir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.disk_cache_dir)
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_read(self, get_mock):
+ # confirm it finds an existing cache block when the cache is
+ # initialized.
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ # block cache should have found the existing block
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ get_mock.assert_not_called()
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_share(self, get_mock):
+ # confirm it finds a cache block written after the disk cache
+ # was initialized.
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ # when we try to get the block, it'll check the disk and find it.
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ get_mock.assert_not_called()
+
+
+ def test_disk_cache_write(self):
+ # confirm the cache block was created
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ self.assertIsNotNone(keep_client.get_from_cache(self.locator))
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
+ self.assertTrue(tutil.binary_compare(f.read(), self.data))
+
+
+ def test_disk_cache_clean(self):
+ # confirm that a tmp file in the cache is cleaned up
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
+ f.write(b"abc1")
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
+ f.write(b"abc2")
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
+ f.write(b"abc3")
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ # The tmp still hasn't been deleted because it was created in the last 60 seconds
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
+
+ # Set the mtime to 61s in the past
+ os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
+ os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
+ os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
+
+ block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ # Tmp should be gone but the other ones are safe.
+ self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_cap(self, get_mock):
+ # confirm that the cache is kept to the desired limit
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
+ with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
+ f.write(b"foo")
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir,
+ max_slots=1)
+
+ self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_share(self, get_mock):
+ # confirm that a second cache doesn't delete files that belong to the first cache.
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
+ with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
+ f.write(b"foo")
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir,
+ max_slots=2)
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+ block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir,
+ max_slots=1)
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+
+
+ def test_disk_cache_error(self):
+ os.chmod(self.disk_cache_dir, stat.S_IRUSR)
+
+ # Fail during cache initialization.
+ with self.assertRaises(OSError):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+
+ def test_disk_cache_write_error(self):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ # Make the cache dir read-only
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
+
+ # Cache fails
+ with self.assertRaises(arvados.errors.KeepCacheError):
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ keep_client.get(self.locator)
+
+
+ @mock.patch('mmap.mmap')
+ def test_disk_cache_retry_write_error(self, mockmmap):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ mockmmap.side_effect = (OSError(errno.ENOSPC, "no space"), self.data)
+
+ cache_max_before = block_cache.cache_max
+
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ self.assertIsNotNone(keep_client.get_from_cache(self.locator))
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
+ self.assertTrue(tutil.binary_compare(f.read(), self.data))
+
+ # shrank the cache in response to ENOSPC
+ self.assertTrue(cache_max_before > block_cache.cache_max)
+
+
+ @mock.patch('mmap.mmap')
+ def test_disk_cache_retry_write_error2(self, mockmmap):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ mockmmap.side_effect = (OSError(errno.ENOMEM, "no memory"), self.data)
+
+ slots_before = block_cache._max_slots
+
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ self.assertIsNotNone(keep_client.get_from_cache(self.locator))
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
+ self.assertTrue(tutil.binary_compare(f.read(), self.data))
+
+ # shrank the cache in response to ENOMEM
+ self.assertTrue(slots_before > block_cache._max_slots)