1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
7 import arvados_fuse as fuse
11 import multiprocessing
24 from . import run_test_server
25 from .integration_test import workerPool
27 logger = logging.getLogger('arvados.arv-mount')
29 class MountTestBase(unittest.TestCase):
35 cls._disk_cache_dir = tempfile.mkdtemp(prefix='MountTest-')
37 cls._disk_cache_dir = None
38 cls._keep_block_cache = arvados.keep.KeepBlockCache(
39 disk_cache=cls.disk_cache,
40 disk_cache_dir=cls._disk_cache_dir,
43 def setUp(self, api=None, local_store=True):
44 # The underlying C implementation of open() makes a fstat() syscall
45 # with the GIL still held. When the GETATTR message comes back to
46 # llfuse (which in these tests is in the same interpreter process) it
47 # can't acquire the GIL, so it can't service the fstat() call, so it
48 # deadlocks. The workaround is to run some of our test code in a
49 # separate process. Forturnately the multiprocessing module makes this
52 self.pool = workerPool()
54 self.keeptmp = tempfile.mkdtemp()
55 os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
58 self.mounttmp = tempfile.mkdtemp()
60 run_test_server.authorize_with("admin")
62 self.api = api if api else arvados.safeapi.ThreadSafeApiCache(
63 arvados.config.settings(),
64 keep_params={"block_cache": self._keep_block_cache},
67 self.llfuse_thread = None
70 def tearDownClass(cls):
71 if cls._disk_cache_dir:
72 shutil.rmtree(cls._disk_cache_dir)
74 # This is a copy of Mount's method. TODO: Refactor MountTestBase
75 # to use a Mount instead of copying its code.
76 def _llfuse_main(self):
80 llfuse.close(unmount=False)
84 def make_mount(self, root_class, **root_kwargs):
85 enable_write = root_kwargs.pop('enable_write', True)
86 self.operations = fuse.Operations(
90 enable_write=enable_write,
92 self.operations.inodes.add_entry(root_class(
94 self.operations.inodes,
98 root_kwargs.pop('filters', None),
101 llfuse.init(self.operations, self.mounttmp, [])
102 self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
103 self.llfuse_thread.daemon = True
104 self.llfuse_thread.start()
105 # wait until the driver is finished initializing
106 self.operations.initlock.wait()
107 return self.operations.inodes[llfuse.ROOT_INODE]
110 if self.llfuse_thread:
111 if self.operations.events:
112 self.operations.events.close(timeout=10)
113 subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
115 self.llfuse_thread.join(timeout=60)
116 if self.llfuse_thread.is_alive():
117 # pytest uses exit status 2 when test collection failed.
118 # A UnitTest failing in setup/teardown counts as a
119 # collection failure, so pytest will exit with status 2
120 # no matter what status you specify here. run-tests.sh
121 # looks for this status, so specify 2 just to keep
122 # everything as consistent as possible.
123 # TODO: If we refactor these tests so they're not built
124 # on unittest, consider using a dedicated, non-pytest
125 # exit code like TEMPFAIL.
126 pytest.exit("llfuse thread outlived test - aborting test suite to avoid deadlock", 2)
127 waited = time.time() - t0
129 logger.warning("MountTestBase.tearDown(): waited %f s for llfuse thread to end", waited)
131 os.rmdir(self.mounttmp)
133 shutil.rmtree(self.keeptmp)
134 os.environ.pop('KEEP_LOCAL_STORE')
135 run_test_server.reset()
137 def assertDirContents(self, subdir, expect_content):
140 path = os.path.join(path, subdir)
141 self.assertEqual(sorted(expect_content), sorted(llfuse.listdir(str(path))))