import hashlib
import mock
import os
+import errno
import pycurl
import random
import re
import shutil
import socket
import sys
+import stat
+import tempfile
import time
import unittest
import urllib.parse
from . import keepstub
from . import run_test_server
-from .arvados_testutil import make_block_cache
+from .arvados_testutil import DiskCacheBase
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepTestCase(run_test_server.TestCaseWithServers):
+class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
disk_cache = False
MAIN_SERVER = {}
KEEP_SERVER = {}
+ block_cache_test = None
@classmethod
def setUpClass(cls):
super(KeepTestCase, cls).setUpClass()
run_test_server.authorize_with("admin")
cls.api_client = arvados.api('v1')
+ cls.block_cache_test = DiskCacheBase()
cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
proxy='', local_store='',
- block_cache=make_block_cache(cls.disk_cache))
+ block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
+
+ @classmethod
+ def tearDownClass(cls):
+ super(KeepTestCase, cls).setUpClass()
+ cls.block_cache_test.tearDown()
def test_KeepBasicRWTest(self):
self.assertEqual(0, self.keep_client.upload_counter.get())
'wrong content from Keep.get for "test_head"')
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
+class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
disk_cache = False
MAIN_SERVER = {}
KEEP_SERVER = {'blob_signing': True}
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def test_KeepBasicRWTest(self):
run_test_server.authorize_with('active')
- keep_client = arvados.KeepClient(block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
foo_locator = keep_client.put('foo')
self.assertRegex(
foo_locator,
unsigned_bar_locator)
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepProxyTestCase(run_test_server.TestCaseWithServers):
+class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
disk_cache = False
MAIN_SERVER = {}
KEEP_SERVER = {}
def tearDown(self):
super(KeepProxyTestCase, self).tearDown()
+ DiskCacheBase.tearDown(self)
def test_KeepProxyTest1(self):
# Will use ARVADOS_KEEP_SERVICES environment variable that
# is set by setUpClass().
keep_client = arvados.KeepClient(api_client=self.api_client,
- local_store='', block_cache=make_block_cache(self.disk_cache))
+ local_store='', block_cache=self.make_block_cache(self.disk_cache))
baz_locator = keep_client.put('baz')
self.assertRegex(
baz_locator,
arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
keep_client = arvados.KeepClient(api_client=self.api_client,
local_store='',
- block_cache=make_block_cache(self.disk_cache))
+ block_cache=self.make_block_cache(self.disk_cache))
uris = [x['_service_root'] for x in keep_client._keep_services]
self.assertEqual(uris, ['http://10.0.0.1/',
'https://foo.example.org:1234/'])
with self.assertRaises(arvados.errors.ArgumentError):
keep_client = arvados.KeepClient(api_client=self.api_client,
local_store='',
- block_cache=make_block_cache(self.disk_cache))
+ block_cache=self.make_block_cache(self.disk_cache))
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
+class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
disk_cache = False
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def get_service_roots(self, api_client):
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
return [urllib.parse.urlparse(url) for url in sorted(services)]
def test_recognize_proxy_services_in_controller_response(self):
keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
service_type='proxy', service_host='localhost', service_port=9, count=1),
- block_cache=make_block_cache(self.disk_cache))
+ block_cache=self.make_block_cache(self.disk_cache))
try:
# this will fail, but it ensures we get the service
# discovery response
api_client.insecure = True
with tutil.mock_keep_responses(b'foo', 200) as mock:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
self.assertEqual(
mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
api_client.insecure = False
with tutil.mock_keep_responses(b'foo', 200) as mock:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
# getopt()==None here means we didn't change the
# default. If we were using real pycurl instead of a mock,
headers = {'X-Keep-Locator':local_loc}
with tutil.mock_keep_responses('', 200, **headers):
# Check that the translated locator gets returned
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
# Check that refresh_signature() uses the correct method and headers
keep_client._get_or_head = mock.MagicMock()
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get('ffffffffffffffffffffffffffffffff')
self.assertEqual(
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepWriteError):
keep_client.put(b'foo')
self.assertEqual(
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.head('ffffffffffffffffffffffffffffffff')
self.assertEqual(
api_client = self.mock_keep_services(service_type='proxy', count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get('ffffffffffffffffffffffffffffffff')
self.assertEqual(
api_client = self.mock_keep_services(service_type='proxy', count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.head('ffffffffffffffffffffffffffffffff')
self.assertEqual(
None)
def test_proxy_put_timeout(self):
+ self.disk_cache_dir = None
api_client = self.mock_keep_services(service_type='proxy', count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
api_client = mock.MagicMock(name='api_client')
api_client.keep_services().accessible().execute.side_effect = (
arvados.errors.ApiError)
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
with self.assertRaises(exc_class) as err_check:
getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
self.assertEqual(0, len(err_check.exception.request_errors()))
"retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
with req_mock, tutil.skip_sleep, \
self.assertRaises(exc_class) as err_check:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
num_retries=3)
self.assertEqual([502, 502], [
api_client = self.mock_keep_services(count=3)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
keep_client.put(data)
self.assertEqual(2, len(exc_check.exception.request_errors()))
api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
keep_client.put(data)
self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
self.assertEqual(0, len(exc_check.exception.request_errors()))
body = b'oddball service get'
api_client = self.mock_keep_services(service_type='fancynewblobstore')
with tutil.mock_keep_responses(body, 200):
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
actual = keep_client.get(tutil.str_keep_locator(body))
self.assertEqual(body, actual)
pdh = tutil.str_keep_locator(body)
api_client = self.mock_keep_services(service_type='fancynewblobstore')
with tutil.mock_keep_responses(pdh, 200):
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
actual = keep_client.put(body, copies=1)
self.assertEqual(pdh, actual)
headers = {'x-keep-replicas-stored': 3}
with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
**headers) as req_mock:
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
actual = keep_client.put(body, copies=2)
self.assertEqual(pdh, actual)
self.assertEqual(1, req_mock.call_count)
@tutil.skip_sleep
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
+class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
disk_cache = False
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
self.data = b'xyzzy'
self.locator = '1271ed5ef305aadabc605b1609e24c52'
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
@mock.patch('arvados.KeepClient.KeepService.get')
def test_get_request_cache(self, get_mock):
with tutil.mock_keep_responses(self.data, 200, 200):
# First reponse was not cached because it was from a HEAD request.
self.assertNotEqual(head_resp, get_resp)
+
+
+
@tutil.skip_sleep
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
+class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
disk_cache = False
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
self.data = b'xyzzy'
self.locator = '1271ed5ef305aadabc605b1609e24c52'
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def test_multiple_default_storage_classes_req_header(self):
api_mock = self.api_client_mock()
api_mock.config.return_value = {
}
}
api_client = self.mock_keep_services(api_mock=api_mock, count=2)
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
resp_hdr = {
'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
'x-keep-replicas-stored': 1
@tutil.skip_sleep
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
+class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
disk_cache = False
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
self.data = b'xyzzy'
self.locator = '1271ed5ef305aadabc605b1609e24c52'
self.test_id = arvados.util.new_request_id()
# id='123456789'>:
self.api_client.request_id = None
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def test_default_to_api_client_request_id(self):
self.api_client.request_id = self.test_id
with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
@tutil.skip_sleep
#@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
+class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
disk_cache = False
def setUp(self):
hashlib.md5(self.blocks[x]).hexdigest()
for x in range(len(self.expected_order))]
self.api_client = self.mock_keep_services(count=self.services)
- self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
+
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
def test_weighted_service_roots_against_reference_set(self):
# Confirm weighted_service_roots() returns the correct order
hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
initial_services = 12
self.api_client = self.mock_keep_services(count=initial_services)
- self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
probes_before = [
self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
for added_services in range(1, 12):
api_client = self.mock_keep_services(count=initial_services+added_services)
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
total_penalty = 0
for hash_index in range(len(hashes)):
probe_after = keep_client.weighted_service_roots(
# Arbitrary port number:
aport = random.randint(1024,65535)
api_client = self.mock_keep_services(service_port=aport, count=self.services)
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
with mock.patch('pycurl.Curl') as curl_mock, \
self.assertRaises(exc_class) as err_check:
curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
+class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
disk_cache = False
# BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
BANDWIDTH_LOW_LIM = 1024
TIMEOUT_TIME = 1.0
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
class assertTakesBetween(unittest.TestCase):
def __init__(self, tmin, tmax):
self.tmin = tmin
def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
return arvados.KeepClient(
api_client=self.api_client,
- timeout=timeouts, block_cache=make_block_cache(self.disk_cache))
+ timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
def test_timeout_slow_connect(self):
# Can't simulate TCP delays with our own socket. Leave our
kc.put(self.DATA, copies=1, num_retries=0)
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
+class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
disk_cache = False
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def mock_disks_and_gateways(self, disks=3, gateways=1):
self.gateways = [{
'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
for gw in self.gateways]
self.api_client = self.mock_keep_services(
count=disks, additional_services=self.gateways)
- self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
+ self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
@mock.patch('pycurl.Curl')
def test_get_with_gateway_hint_first(self, MockCurl):
def new_client(self, **caller_kwargs):
kwargs = self.client_kwargs.copy()
kwargs.update(caller_kwargs)
- kwargs['block_cache'] = make_block_cache(self.disk_cache)
+ kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
return arvados.KeepClient(**kwargs)
def run_method(self, *args, **kwargs):
@tutil.skip_sleep
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
+class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
DEFAULT_EXCEPTION = arvados.errors.KeepReadError
HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
*args, **kwargs):
return self.new_client().get(locator, *args, **kwargs)
@tutil.skip_sleep
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
+class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
DEFAULT_EXPECT = True
DEFAULT_EXCEPTION = arvados.errors.KeepReadError
HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
*args, **kwargs):
return self.new_client().head(locator, *args, **kwargs)
@tutil.skip_sleep
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
+class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
copies=1, *args, **kwargs):
return self.new_client().put(data, copies, *args, **kwargs)
@tutil.skip_sleep
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
+class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
block_cache = False
# Test put()s that need two distinct servers to succeed, possibly
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
+
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
def test_success_after_exception(self):
with tutil.mock_keep_responses(
self.assertEqual(2, req_mock.call_count)
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepClientAPIErrorTest(unittest.TestCase):
+class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
disk_cache = False
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def test_api_fail(self):
class ApiMock(object):
def __getattr__(self, r):
raise arvados.errors.KeepReadError()
keep_client = arvados.KeepClient(api_client=ApiMock(),
proxy='', local_store='',
- block_cache=make_block_cache(self.disk_cache))
+ block_cache=self.make_block_cache(self.disk_cache))
# The bug this is testing for is that if an API (not
# keepstore) exception is thrown as part of a get(), the next
keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
+
+
+class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
+ def setUp(self):
+ self.api_client = self.mock_keep_services(count=2)
+ self.data = b'xyzzy'
+ self.locator = '1271ed5ef305aadabc605b1609e24c52'
+ self.disk_cache_dir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.disk_cache_dir)
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_read(self, get_mock):
+ # confirm it finds an existing cache block when the cache is
+ # initialized.
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ # block cache should have found the existing block
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ get_mock.assert_not_called()
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_share(self, get_mock):
+ # confirm it finds a cache block written after the disk cache
+ # was initialized.
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ # when we try to get the block, it'll check the disk and find it.
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ get_mock.assert_not_called()
+
+
+ def test_disk_cache_write(self):
+ # confirm the cache block was created
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ self.assertIsNotNone(keep_client.get_from_cache(self.locator))
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
+ self.assertTrue(tutil.binary_compare(f.read(), self.data))
+
+
+ def test_disk_cache_clean(self):
+ # confirm that a tmp file in the cache is cleaned up
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
+ f.write(b"abc1")
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
+ f.write(b"abc2")
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
+ f.write(b"abc3")
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ # The tmp still hasn't been deleted because it was created in the last 60 seconds
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
+
+ # Set the mtime to 61s in the past
+ os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
+ os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
+ os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
+
+ block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ # Tmp should be gone but the other ones are safe.
+ self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_cap(self, get_mock):
+ # confirm that the cache is kept to the desired limit
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
+ with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
+ f.write(b"foo")
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir,
+ max_slots=1)
+
+ self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_share(self, get_mock):
+ # confirm that a second cache doesn't delete files that belong to the first cache.
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
+ with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
+ f.write(b"foo")
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir,
+ max_slots=2)
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+ block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir,
+ max_slots=1)
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+
+
+ def test_disk_cache_error(self):
+ os.chmod(self.disk_cache_dir, stat.S_IRUSR)
+
+ # Fail during cache initialization.
+ with self.assertRaises(OSError):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+
+ def test_disk_cache_write_error(self):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ # Make the cache dir read-only
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
+
+ # Cache fails
+ with self.assertRaises(arvados.errors.KeepCacheError):
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ keep_client.get(self.locator)
+
+
+ @mock.patch('mmap.mmap')
+ def test_disk_cache_retry_write_error(self, mockmmap):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ mockmmap.side_effect = (OSError(errno.ENOSPC, "no space"), self.data)
+
+ cache_max_before = block_cache.cache_max
+
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ self.assertIsNotNone(keep_client.get_from_cache(self.locator))
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
+ self.assertTrue(tutil.binary_compare(f.read(), self.data))
+
+ # shrank the cache in response to ENOSPC
+ self.assertTrue(cache_max_before > block_cache.cache_max)
+
+
+ @mock.patch('mmap.mmap')
+ def test_disk_cache_retry_write_error2(self, mockmmap):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ mockmmap.side_effect = (OSError(errno.ENOMEM, "no memory"), self.data)
+
+ slots_before = block_cache._max_slots
+
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ self.assertIsNotNone(keep_client.get_from_cache(self.locator))
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
+ self.assertTrue(tutil.binary_compare(f.read(), self.data))
+
+ # shrank the cache in response to ENOMEM
+ self.assertTrue(slots_before > block_cache._max_slots)