import hashlib
import mock
import os
+import errno
import pycurl
import random
import re
+import shutil
import socket
import sys
+import stat
+import tempfile
import time
import unittest
import urllib.parse
+import parameterized
+
import arvados
import arvados.retry
import arvados.util
from . import keepstub
from . import run_test_server
-class KeepTestCase(run_test_server.TestCaseWithServers):
+from .arvados_testutil import DiskCacheBase
+
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
+ disk_cache = False
MAIN_SERVER = {}
KEEP_SERVER = {}
+ block_cache_test = None
@classmethod
def setUpClass(cls):
super(KeepTestCase, cls).setUpClass()
run_test_server.authorize_with("admin")
cls.api_client = arvados.api('v1')
+ cls.block_cache_test = DiskCacheBase()
cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
- proxy='', local_store='')
+ proxy='', local_store='',
+ block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
+
+ @classmethod
+ def tearDownClass(cls):
+ super(KeepTestCase, cls).setUpClass()
+ cls.block_cache_test.tearDown()
def test_KeepBasicRWTest(self):
self.assertEqual(0, self.keep_client.upload_counter.get())
self.assertEqual(6, self.keep_client.upload_counter.get())
self.assertEqual(0, self.keep_client.download_counter.get())
- self.assertEqual(self.keep_client.get(foo_locator),
- b'foo',
+ self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
+ b'foo'),
'wrong content from Keep.get(md5("foo"))')
self.assertEqual(3, self.keep_client.download_counter.get())
b'test_head',
'wrong content from Keep.get for "test_head"')
-class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
+ disk_cache = False
MAIN_SERVER = {}
KEEP_SERVER = {'blob_signing': True}
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def test_KeepBasicRWTest(self):
run_test_server.authorize_with('active')
- keep_client = arvados.KeepClient()
+ keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
foo_locator = keep_client.put('foo')
self.assertRegex(
foo_locator,
keep_client.get,
unsigned_bar_locator)
-
-class KeepProxyTestCase(run_test_server.TestCaseWithServers):
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
+ disk_cache = False
MAIN_SERVER = {}
KEEP_SERVER = {}
KEEP_PROXY_SERVER = {}
cls.api_client = arvados.api('v1')
def tearDown(self):
- arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
super(KeepProxyTestCase, self).tearDown()
+ DiskCacheBase.tearDown(self)
def test_KeepProxyTest1(self):
# Will use ARVADOS_KEEP_SERVICES environment variable that
# is set by setUpClass().
keep_client = arvados.KeepClient(api_client=self.api_client,
- local_store='')
+ local_store='', block_cache=self.make_block_cache(self.disk_cache))
baz_locator = keep_client.put('baz')
self.assertRegex(
baz_locator,
'wrong content from Keep.get(md5("baz"))')
self.assertTrue(keep_client.using_proxy)
- def test_KeepProxyTest2(self):
- # Don't instantiate the proxy directly, but set the X-External-Client
- # header. The API server should direct us to the proxy.
- arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
- keep_client = arvados.KeepClient(api_client=self.api_client,
- proxy='', local_store='')
- baz_locator = keep_client.put('baz2')
- self.assertRegex(
- baz_locator,
- '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
- 'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
- self.assertEqual(keep_client.get(baz_locator),
- b'baz2',
- 'wrong content from Keep.get(md5("baz2"))')
- self.assertTrue(keep_client.using_proxy)
-
def test_KeepProxyTestMultipleURIs(self):
# Test using ARVADOS_KEEP_SERVICES env var overriding any
# existing proxy setting and setting multiple proxies
arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
keep_client = arvados.KeepClient(api_client=self.api_client,
- local_store='')
+ local_store='',
+ block_cache=self.make_block_cache(self.disk_cache))
uris = [x['_service_root'] for x in keep_client._keep_services]
self.assertEqual(uris, ['http://10.0.0.1/',
'https://foo.example.org:1234/'])
arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
with self.assertRaises(arvados.errors.ArgumentError):
keep_client = arvados.KeepClient(api_client=self.api_client,
- local_store='')
+ local_store='',
+ block_cache=self.make_block_cache(self.disk_cache))
+
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
+ disk_cache = False
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
-class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
def get_service_roots(self, api_client):
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
return [urllib.parse.urlparse(url) for url in sorted(services)]
self.assertEqual('100::1', service.hostname)
self.assertEqual(10, service.port)
+ def test_recognize_proxy_services_in_controller_response(self):
+ keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
+ service_type='proxy', service_host='localhost', service_port=9, count=1),
+ block_cache=self.make_block_cache(self.disk_cache))
+ try:
+ # this will fail, but it ensures we get the service
+ # discovery response
+ keep_client.put('baz2')
+ except:
+ pass
+ self.assertTrue(keep_client.using_proxy)
+
def test_insecure_disables_tls_verify(self):
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
api_client.insecure = True
with tutil.mock_keep_responses(b'foo', 200) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
self.assertEqual(
mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
0)
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
+ 0)
api_client.insecure = False
with tutil.mock_keep_responses(b'foo', 200) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
# getopt()==None here means we didn't change the
# default. If we were using real pycurl instead of a mock,
self.assertEqual(
mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
None)
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
+ None)
def test_refresh_signature(self):
blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
headers = {'X-Keep-Locator':local_loc}
with tutil.mock_keep_responses('', 200, **headers):
# Check that the translated locator gets returned
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
# Check that refresh_signature() uses the correct method and headers
keep_client._get_or_head = mock.MagicMock()
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get('ffffffffffffffffffffffffffffffff')
self.assertEqual(
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepWriteError):
keep_client.put(b'foo')
self.assertEqual(
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.head('ffffffffffffffffffffffffffffffff')
self.assertEqual(
api_client = self.mock_keep_services(service_type='proxy', count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get('ffffffffffffffffffffffffffffffff')
self.assertEqual(
api_client = self.mock_keep_services(service_type='proxy', count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.head('ffffffffffffffffffffffffffffffff')
self.assertEqual(
None)
def test_proxy_put_timeout(self):
+ self.disk_cache_dir = None
api_client = self.mock_keep_services(service_type='proxy', count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
api_client = mock.MagicMock(name='api_client')
api_client.keep_services().accessible().execute.side_effect = (
arvados.errors.ApiError)
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
with self.assertRaises(exc_class) as err_check:
getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
self.assertEqual(0, len(err_check.exception.request_errors()))
"retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
with req_mock, tutil.skip_sleep, \
self.assertRaises(exc_class) as err_check:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
num_retries=3)
self.assertEqual([502, 502], [
api_client = self.mock_keep_services(count=3)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
keep_client.put(data)
self.assertEqual(2, len(exc_check.exception.request_errors()))
api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
keep_client.put(data)
self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
self.assertEqual(0, len(exc_check.exception.request_errors()))
body = b'oddball service get'
api_client = self.mock_keep_services(service_type='fancynewblobstore')
with tutil.mock_keep_responses(body, 200):
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
actual = keep_client.get(tutil.str_keep_locator(body))
self.assertEqual(body, actual)
pdh = tutil.str_keep_locator(body)
api_client = self.mock_keep_services(service_type='fancynewblobstore')
with tutil.mock_keep_responses(pdh, 200):
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
actual = keep_client.put(body, copies=1)
self.assertEqual(pdh, actual)
headers = {'x-keep-replicas-stored': 3}
with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
**headers) as req_mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
actual = keep_client.put(body, copies=2)
self.assertEqual(pdh, actual)
self.assertEqual(1, req_mock.call_count)
-
@tutil.skip_sleep
-class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
+ disk_cache = False
+
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
self.data = b'xyzzy'
self.locator = '1271ed5ef305aadabc605b1609e24c52'
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
@mock.patch('arvados.KeepClient.KeepService.get')
def test_get_request_cache(self, get_mock):
with tutil.mock_keep_responses(self.data, 200, 200):
def test_head_and_then_get_return_different_responses(self, get_mock):
head_resp = None
get_resp = None
- get_mock.side_effect = ['first response', 'second response']
+ get_mock.side_effect = [b'first response', b'second response']
with tutil.mock_keep_responses(self.data, 200, 200):
head_resp = self.keep_client.head(self.locator)
get_resp = self.keep_client.get(self.locator)
- self.assertEqual('first response', head_resp)
+ self.assertEqual(b'first response', head_resp)
# First reponse was not cached because it was from a HEAD request.
self.assertNotEqual(head_resp, get_resp)
+
+
+
@tutil.skip_sleep
-class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
+ disk_cache = False
+
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
self.data = b'xyzzy'
self.locator = '1271ed5ef305aadabc605b1609e24c52'
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def test_multiple_default_storage_classes_req_header(self):
api_mock = self.api_client_mock()
api_mock.config.return_value = {
}
}
api_client = self.mock_keep_services(api_mock=api_mock, count=2)
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
resp_hdr = {
'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
'x-keep-replicas-stored': 1
self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
@tutil.skip_sleep
-class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
+ disk_cache = False
+
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
self.data = b'xyzzy'
self.locator = '1271ed5ef305aadabc605b1609e24c52'
self.test_id = arvados.util.new_request_id()
# id='123456789'>:
self.api_client.request_id = None
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def test_default_to_api_client_request_id(self):
self.api_client.request_id = self.test_id
with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
self.keep_client.head(self.locator)
self.assertAutomaticRequestId(mock.responses[0])
+ def test_request_id_in_exception(self):
+ with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
+ with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
+ self.keep_client.head(self.locator, request_id=self.test_id)
+
+ with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
+ with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
+ self.keep_client.get(self.locator)
+
+ with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
+ with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
+ self.keep_client.put(self.data, request_id=self.test_id)
+
+ with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
+ with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
+ self.keep_client.put(self.data)
+
def assertAutomaticRequestId(self, resp):
hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
if x.startswith('X-Request-Id: ')][0]
@tutil.skip_sleep
-class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
+#@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
+ disk_cache = False
def setUp(self):
# expected_order[i] is the probe order for
hashlib.md5(self.blocks[x]).hexdigest()
for x in range(len(self.expected_order))]
self.api_client = self.mock_keep_services(count=self.services)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
+
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
def test_weighted_service_roots_against_reference_set(self):
# Confirm weighted_service_roots() returns the correct order
hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
initial_services = 12
self.api_client = self.mock_keep_services(count=initial_services)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
probes_before = [
self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
for added_services in range(1, 12):
api_client = self.mock_keep_services(count=initial_services+added_services)
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
total_penalty = 0
for hash_index in range(len(hashes)):
probe_after = keep_client.weighted_service_roots(
# Arbitrary port number:
aport = random.randint(1024,65535)
api_client = self.mock_keep_services(service_port=aport, count=self.services)
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
with mock.patch('pycurl.Curl') as curl_mock, \
self.assertRaises(exc_class) as err_check:
curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
def test_put_error_shows_probe_order(self):
self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
+ disk_cache = False
-class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
# BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
# 1s worth of data and then trigger bandwidth errors before running
# out of data.
BANDWIDTH_LOW_LIM = 1024
TIMEOUT_TIME = 1.0
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
class assertTakesBetween(unittest.TestCase):
def __init__(self, tmin, tmax):
self.tmin = tmin
def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
return arvados.KeepClient(
api_client=self.api_client,
- timeout=timeouts)
+ timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
def test_timeout_slow_connect(self):
# Can't simulate TCP delays with our own socket. Leave our
with self.assertRaises(arvados.errors.KeepWriteError):
kc.put(self.DATA, copies=1, num_retries=0)
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
+ disk_cache = False
+
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
-class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
def mock_disks_and_gateways(self, disks=3, gateways=1):
self.gateways = [{
'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
for gw in self.gateways]
self.api_client = self.mock_keep_services(
count=disks, additional_services=self.gateways)
- self.keepClient = arvados.KeepClient(api_client=self.api_client)
+ self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
@mock.patch('pycurl.Curl')
def test_get_with_gateway_hint_first(self, MockCurl):
self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
MockCurl.return_value.getopt(pycurl.URL).decode())
-
class KeepClientRetryTestMixin(object):
+ disk_cache = False
+
# Testing with a local Keep store won't exercise the retry behavior.
# Instead, our strategy is:
# * Create a client with one proxy specified (pointed at a black
def new_client(self, **caller_kwargs):
kwargs = self.client_kwargs.copy()
kwargs.update(caller_kwargs)
+ kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
return arvados.KeepClient(**kwargs)
def run_method(self, *args, **kwargs):
@tutil.skip_sleep
-class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
DEFAULT_EXCEPTION = arvados.errors.KeepReadError
HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
*args, **kwargs):
return self.new_client().get(locator, *args, **kwargs)
self.check_success(locator=self.HINTED_LOCATOR)
@tutil.skip_sleep
-class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
DEFAULT_EXPECT = True
DEFAULT_EXCEPTION = arvados.errors.KeepReadError
HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
*args, **kwargs):
return self.new_client().head(locator, *args, **kwargs)
self.check_success(locator=self.HINTED_LOCATOR)
@tutil.skip_sleep
-class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
copies=1, *args, **kwargs):
return self.new_client().put(data, copies, *args, **kwargs)
def last_result(self):
if self.will_succeed:
return self._result
+ else:
+ return {"status_code": 500, "body": "didn't succeed"}
def finished(self):
return False
@tutil.skip_sleep
-class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
+ block_cache = False
+
# Test put()s that need two distinct servers to succeed, possibly
# requiring multiple passes through the retry loop.
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
+
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
def test_success_after_exception(self):
with tutil.mock_keep_responses(
self.keep_client.put('foo', num_retries=1, copies=2)
self.assertEqual(2, req_mock.call_count)
-class KeepClientAPIErrorTest(unittest.TestCase):
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
+ disk_cache = False
+
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
def test_api_fail(self):
class ApiMock(object):
def __getattr__(self, r):
else:
raise arvados.errors.KeepReadError()
keep_client = arvados.KeepClient(api_client=ApiMock(),
- proxy='', local_store='')
+ proxy='', local_store='',
+ block_cache=self.make_block_cache(self.disk_cache))
# The bug this is testing for is that if an API (not
# keepstore) exception is thrown as part of a get(), the next
keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
+
+
+class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
+ def setUp(self):
+ self.api_client = self.mock_keep_services(count=2)
+ self.data = b'xyzzy'
+ self.locator = '1271ed5ef305aadabc605b1609e24c52'
+ self.disk_cache_dir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.disk_cache_dir)
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_read(self, get_mock):
+ # confirm it finds an existing cache block when the cache is
+ # initialized.
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ # block cache should have found the existing block
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ get_mock.assert_not_called()
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_share(self, get_mock):
+ # confirm it finds a cache block written after the disk cache
+ # was initialized.
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ # when we try to get the block, it'll check the disk and find it.
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ get_mock.assert_not_called()
+
+
+ def test_disk_cache_write(self):
+ # confirm the cache block was created
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ self.assertIsNotNone(keep_client.get_from_cache(self.locator))
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
+ self.assertTrue(tutil.binary_compare(f.read(), self.data))
+
+
+ def test_disk_cache_clean(self):
+ # confirm that a tmp file in the cache is cleaned up
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
+ f.write(b"abc1")
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
+ f.write(b"abc2")
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
+ f.write(b"abc3")
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ # The tmp still hasn't been deleted because it was created in the last 60 seconds
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
+
+ # Set the mtime to 61s in the past
+ os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
+ os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
+ os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
+
+ block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ # Tmp should be gone but the other ones are safe.
+ self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_cap(self, get_mock):
+ # confirm that the cache is kept to the desired limit
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
+ with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
+ f.write(b"foo")
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir,
+ max_slots=1)
+
+ self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_share(self, get_mock):
+ # confirm that a second cache doesn't delete files that belong to the first cache.
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
+ with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
+ f.write(b"foo")
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir,
+ max_slots=2)
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+ block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir,
+ max_slots=1)
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+
+
+ def test_disk_cache_error(self):
+ os.chmod(self.disk_cache_dir, stat.S_IRUSR)
+
+ # Fail during cache initialization.
+ with self.assertRaises(OSError):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+
+ def test_disk_cache_write_error(self):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ # Make the cache dir read-only
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
+
+ # Cache fails
+ with self.assertRaises(arvados.errors.KeepCacheError):
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ keep_client.get(self.locator)
+
+
+ @mock.patch('mmap.mmap')
+ def test_disk_cache_retry_write_error(self, mockmmap):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ mockmmap.side_effect = (OSError(errno.ENOSPC, "no space"), self.data)
+
+ cache_max_before = block_cache.cache_max
+
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ self.assertIsNotNone(keep_client.get_from_cache(self.locator))
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
+ self.assertTrue(tutil.binary_compare(f.read(), self.data))
+
+ # shrank the cache in response to ENOSPC
+ self.assertTrue(cache_max_before > block_cache.cache_max)
+
+
+ @mock.patch('mmap.mmap')
+ def test_disk_cache_retry_write_error2(self, mockmmap):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ mockmmap.side_effect = (OSError(errno.ENOMEM, "no memory"), self.data)
+
+ slots_before = block_cache._max_slots
+
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ self.assertIsNotNone(keep_client.get_from_cache(self.locator))
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
+ self.assertTrue(tutil.binary_compare(f.read(), self.data))
+
+ # shrank the cache in response to ENOMEM
+ self.assertTrue(slots_before > block_cache._max_slots)