1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
7 import arvados_fuse as fuse
11 import multiprocessing
24 from . import run_test_server
25 from .integration_test import workerPool
27 logger = logging.getLogger('arvados.arv-mount')
29 def make_block_cache(disk_cache):
31 disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
32 shutil.rmtree(disk_cache_dir, ignore_errors=True)
33 block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
36 class MountTestBase(unittest.TestCase):
39 def setUp(self, api=None, local_store=True):
40 # The underlying C implementation of open() makes a fstat() syscall
41 # with the GIL still held. When the GETATTR message comes back to
42 # llfuse (which in these tests is in the same interpreter process) it
43 # can't acquire the GIL, so it can't service the fstat() call, so it
44 # deadlocks. The workaround is to run some of our test code in a
45 # separate process. Forturnately the multiprocessing module makes this
48 self.pool = workerPool()
50 self.keeptmp = tempfile.mkdtemp()
51 os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
54 self.mounttmp = tempfile.mkdtemp()
56 run_test_server.authorize_with("admin")
58 self.api = api if api else arvados.safeapi.ThreadSafeApiCache(
59 arvados.config.settings(),
60 keep_params={"block_cache": make_block_cache(self.disk_cache)},
63 self.llfuse_thread = None
65 # This is a copy of Mount's method. TODO: Refactor MountTestBase
66 # to use a Mount instead of copying its code.
67 def _llfuse_main(self):
71 llfuse.close(unmount=False)
75 def make_mount(self, root_class, **root_kwargs):
76 enable_write = root_kwargs.pop('enable_write', True)
77 self.operations = fuse.Operations(
81 enable_write=enable_write,
83 self.operations.inodes.add_entry(root_class(
85 self.operations.inodes,
89 root_kwargs.pop('filters', None),
92 llfuse.init(self.operations, self.mounttmp, [])
93 self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
94 self.llfuse_thread.daemon = True
95 self.llfuse_thread.start()
96 # wait until the driver is finished initializing
97 self.operations.initlock.wait()
98 return self.operations.inodes[llfuse.ROOT_INODE]
101 if self.llfuse_thread:
102 if self.operations.events:
103 self.operations.events.close(timeout=10)
104 subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
106 self.llfuse_thread.join(timeout=60)
107 if self.llfuse_thread.is_alive():
108 # pytest uses exit status 2 when test collection failed.
109 # A UnitTest failing in setup/teardown counts as a
110 # collection failure, so pytest will exit with status 2
111 # no matter what status you specify here. run-tests.sh
112 # looks for this status, so specify 2 just to keep
113 # everything as consistent as possible.
114 # TODO: If we refactor these tests so they're not built
115 # on unittest, consider using a dedicated, non-pytest
116 # exit code like TEMPFAIL.
117 pytest.exit("llfuse thread outlived test - aborting test suite to avoid deadlock", 2)
118 waited = time.time() - t0
120 logger.warning("MountTestBase.tearDown(): waited %f s for llfuse thread to end", waited)
122 os.rmdir(self.mounttmp)
124 shutil.rmtree(self.keeptmp)
125 os.environ.pop('KEEP_LOCAL_STORE')
126 run_test_server.reset()
128 def assertDirContents(self, subdir, expect_content):
131 path = os.path.join(path, subdir)
132 self.assertEqual(sorted(expect_content), sorted(llfuse.listdir(str(path))))