#
# SPDX-License-Identifier: AGPL-3.0
+from __future__ import absolute_import
import arvados
+import arvados.keep
import arvados_fuse as fuse
import arvados.safeapi
import llfuse
import logging
import multiprocessing
import os
-import run_test_server
+from . import run_test_server
import shutil
import signal
import subprocess
logger = logging.getLogger('arvados.arv-mount')
+from .integration_test import workerPool
+
+def make_block_cache(disk_cache):
+ if disk_cache:
+ disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+ shutil.rmtree(disk_cache_dir, ignore_errors=True)
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
+ return block_cache
+
class MountTestBase(unittest.TestCase):
+ disk_cache = False
+
def setUp(self, api=None, local_store=True):
# The underlying C implementation of open() makes a fstat() syscall
# with the GIL still held. When the GETATTR message comes back to
# deadlocks. The workaround is to run some of our test code in a
# separate process. Forturnately the multiprocessing module makes this
# relatively easy.
- self.pool = multiprocessing.Pool(1)
+ self.pool = workerPool()
if local_store:
self.keeptmp = tempfile.mkdtemp()
os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
self.mounttmp = tempfile.mkdtemp()
run_test_server.run()
run_test_server.authorize_with("admin")
- self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
+
+ self.api = api if api else arvados.safeapi.ThreadSafeApiCache(
+ arvados.config.settings(),
+ keep_params={"block_cache": make_block_cache(self.disk_cache)},
+ version='v1',
+ )
self.llfuse_thread = None
# This is a copy of Mount's method. TODO: Refactor MountTestBase
llfuse.close()
def make_mount(self, root_class, **root_kwargs):
+ enable_write = True
+ if 'enable_write' in root_kwargs:
+ enable_write = root_kwargs.pop('enable_write')
self.operations = fuse.Operations(
os.getuid(), os.getgid(),
api_client=self.api,
- enable_write=True)
+ enable_write=enable_write)
self.operations.inodes.add_entry(root_class(
- llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
+ llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, enable_write, **root_kwargs))
llfuse.init(self.operations, self.mounttmp, [])
self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
self.llfuse_thread.daemon = True
shutil.rmtree(self.keeptmp)
os.environ.pop('KEEP_LOCAL_STORE')
run_test_server.reset()
- self.pool.close()
- self.pool.join()
def assertDirContents(self, subdir, expect_content):
path = self.mounttmp
if subdir:
path = os.path.join(path, subdir)
- self.assertEqual(sorted(expect_content), sorted(llfuse.listdir(path)))
+ self.assertEqual(sorted(expect_content), sorted(llfuse.listdir(str(path))))