from .integration_test import workerPool
def make_block_cache(disk_cache):
- block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
if disk_cache:
disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
shutil.rmtree(disk_cache_dir, ignore_errors=True)
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
return block_cache
class MountTestBase(unittest.TestCase):
- block_cache = False
+ disk_cache = False
def setUp(self, api=None, local_store=True):
# The underlying C implementation of open() makes a fstat() syscall
run_test_server.run()
run_test_server.authorize_with("admin")
- self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings(), keep_params={"block_cache": make_block_cache(self.block_cache)})
+ self.api = api if api else arvados.safeapi.ThreadSafeApiCache(
+ arvados.config.settings(),
+ keep_params={"block_cache": make_block_cache(self.disk_cache)},
+ version='v1',
+ )
self.llfuse_thread = None
# This is a copy of Mount's method. TODO: Refactor MountTestBase