1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import absolute_import
8 import arvados_fuse as fuse
12 import multiprocessing
14 from . import run_test_server
24 logger = logging.getLogger('arvados.arv-mount')
26 from .integration_test import workerPool
28 def make_block_cache(disk_cache):
30 disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
31 shutil.rmtree(disk_cache_dir, ignore_errors=True)
32 block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
35 class MountTestBase(unittest.TestCase):
38 def setUp(self, api=None, local_store=True):
39 # The underlying C implementation of open() makes a fstat() syscall
40 # with the GIL still held. When the GETATTR message comes back to
41 # llfuse (which in these tests is in the same interpreter process) it
42 # can't acquire the GIL, so it can't service the fstat() call, so it
43 # deadlocks. The workaround is to run some of our test code in a
44 # separate process. Forturnately the multiprocessing module makes this
47 self.pool = workerPool()
49 self.keeptmp = tempfile.mkdtemp()
50 os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
53 self.mounttmp = tempfile.mkdtemp()
55 run_test_server.authorize_with("admin")
57 self.api = api if api else arvados.safeapi.ThreadSafeApiCache(
58 arvados.config.settings(),
59 keep_params={"block_cache": make_block_cache(self.disk_cache)},
62 self.llfuse_thread = None
64 # This is a copy of Mount's method. TODO: Refactor MountTestBase
65 # to use a Mount instead of copying its code.
66 def _llfuse_main(self):
70 llfuse.close(unmount=False)
74 def make_mount(self, root_class, **root_kwargs):
76 if 'enable_write' in root_kwargs:
77 enable_write = root_kwargs.pop('enable_write')
78 self.operations = fuse.Operations(
79 os.getuid(), os.getgid(),
81 enable_write=enable_write)
82 self.operations.inodes.add_entry(root_class(
83 llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, enable_write, **root_kwargs))
84 llfuse.init(self.operations, self.mounttmp, [])
85 self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
86 self.llfuse_thread.daemon = True
87 self.llfuse_thread.start()
88 # wait until the driver is finished initializing
89 self.operations.initlock.wait()
90 return self.operations.inodes[llfuse.ROOT_INODE]
93 if self.llfuse_thread:
94 if self.operations.events:
95 self.operations.events.close(timeout=10)
96 subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
98 self.llfuse_thread.join(timeout=10)
99 if self.llfuse_thread.is_alive():
100 logger.warning("MountTestBase.tearDown():"
101 " llfuse thread still alive 10s after umount"
102 " -- exiting with SIGKILL")
103 os.kill(os.getpid(), signal.SIGKILL)
104 waited = time.time() - t0
106 logger.warning("MountTestBase.tearDown(): waited %f s for llfuse thread to end", waited)
108 os.rmdir(self.mounttmp)
110 shutil.rmtree(self.keeptmp)
111 os.environ.pop('KEEP_LOCAL_STORE')
112 run_test_server.reset()
114 def assertDirContents(self, subdir, expect_content):
117 path = os.path.join(path, subdir)
118 self.assertEqual(sorted(expect_content), sorted(llfuse.listdir(str(path))))