Merge branch '9766-register-workflow' closes #9766
[arvados.git] / services / fuse / tests / mount_test_base.py
1 import arvados
2 import arvados_fuse as fuse
3 import arvados.safeapi
4 import llfuse
5 import logging
6 import multiprocessing
7 import os
8 import run_test_server
9 import shutil
10 import signal
11 import subprocess
12 import sys
13 import tempfile
14 import threading
15 import time
16 import unittest
17
18 logger = logging.getLogger('arvados.arv-mount')
19
20 class MountTestBase(unittest.TestCase):
21     def setUp(self, api=None, local_store=True):
22         # The underlying C implementation of open() makes a fstat() syscall
23         # with the GIL still held.  When the GETATTR message comes back to
24         # llfuse (which in these tests is in the same interpreter process) it
25         # can't acquire the GIL, so it can't service the fstat() call, so it
26         # deadlocks.  The workaround is to run some of our test code in a
27         # separate process.  Forturnately the multiprocessing module makes this
28         # relatively easy.
29         self.pool = multiprocessing.Pool(1)
30
31         if local_store:
32             self.keeptmp = tempfile.mkdtemp()
33             os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
34         else:
35             self.keeptmp = None
36         self.mounttmp = tempfile.mkdtemp()
37         run_test_server.run()
38         run_test_server.authorize_with("admin")
39         self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
40         self.llfuse_thread = None
41
42     # This is a copy of Mount's method.  TODO: Refactor MountTestBase
43     # to use a Mount instead of copying its code.
44     def _llfuse_main(self):
45         try:
46             llfuse.main()
47         except:
48             llfuse.close(unmount=False)
49             raise
50         llfuse.close()
51
52     def make_mount(self, root_class, **root_kwargs):
53         self.operations = fuse.Operations(
54             os.getuid(), os.getgid(),
55             api_client=self.api,
56             enable_write=True)
57         self.operations.inodes.add_entry(root_class(
58             llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
59         llfuse.init(self.operations, self.mounttmp, [])
60         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
61         self.llfuse_thread.daemon = True
62         self.llfuse_thread.start()
63         # wait until the driver is finished initializing
64         self.operations.initlock.wait()
65         return self.operations.inodes[llfuse.ROOT_INODE]
66
67     def tearDown(self):
68         if self.llfuse_thread:
69             subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
70             t0 = time.time()
71             self.llfuse_thread.join(timeout=10)
72             if self.llfuse_thread.is_alive():
73                 logger.warning("MountTestBase.tearDown():"
74                                " llfuse thread still alive 10s after umount"
75                                " -- exiting with SIGKILL")
76                 os.kill(os.getpid(), signal.SIGKILL)
77             waited = time.time() - t0
78             if waited > 0.1:
79                 logger.warning("MountTestBase.tearDown(): waited %f s for llfuse thread to end", waited)
80
81         os.rmdir(self.mounttmp)
82         if self.keeptmp:
83             shutil.rmtree(self.keeptmp)
84             os.environ.pop('KEEP_LOCAL_STORE')
85         run_test_server.reset()
86         self.pool.close()
87         self.pool.join()
88
89     def assertDirContents(self, subdir, expect_content):
90         path = self.mounttmp
91         if subdir:
92             path = os.path.join(path, subdir)
93         self.assertEqual(sorted(expect_content), sorted(llfuse.listdir(path)))