Merge branch '12430-output-glob'
[arvados.git] / services / fuse / tests / mount_test_base.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados
6 import arvados.keep
7 import arvados_fuse as fuse
8 import arvados.safeapi
9 import llfuse
10 import logging
11 import multiprocessing
12 import os
13 import shutil
14 import signal
15 import subprocess
16 import sys
17 import tempfile
18 import threading
19 import time
20 import unittest
21
22 from . import run_test_server
23 from .integration_test import workerPool
24
25 logger = logging.getLogger('arvados.arv-mount')
26
27 def make_block_cache(disk_cache):
28     if disk_cache:
29         disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
30         shutil.rmtree(disk_cache_dir, ignore_errors=True)
31     block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
32     return block_cache
33
34 class MountTestBase(unittest.TestCase):
35     disk_cache = False
36
37     def setUp(self, api=None, local_store=True):
38         # The underlying C implementation of open() makes a fstat() syscall
39         # with the GIL still held.  When the GETATTR message comes back to
40         # llfuse (which in these tests is in the same interpreter process) it
41         # can't acquire the GIL, so it can't service the fstat() call, so it
42         # deadlocks.  The workaround is to run some of our test code in a
43         # separate process.  Forturnately the multiprocessing module makes this
44         # relatively easy.
45
46         self.pool = workerPool()
47         if local_store:
48             self.keeptmp = tempfile.mkdtemp()
49             os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
50         else:
51             self.keeptmp = None
52         self.mounttmp = tempfile.mkdtemp()
53         run_test_server.run()
54         run_test_server.authorize_with("admin")
55
56         self.api = api if api else arvados.safeapi.ThreadSafeApiCache(
57             arvados.config.settings(),
58             keep_params={"block_cache": make_block_cache(self.disk_cache)},
59             version='v1',
60         )
61         self.llfuse_thread = None
62
63     # This is a copy of Mount's method.  TODO: Refactor MountTestBase
64     # to use a Mount instead of copying its code.
65     def _llfuse_main(self):
66         try:
67             llfuse.main()
68         except:
69             llfuse.close(unmount=False)
70             raise
71         llfuse.close()
72
73     def make_mount(self, root_class, **root_kwargs):
74         enable_write = root_kwargs.pop('enable_write', True)
75         self.operations = fuse.Operations(
76             os.getuid(),
77             os.getgid(),
78             api_client=self.api,
79             enable_write=enable_write,
80         )
81         self.operations.inodes.add_entry(root_class(
82             llfuse.ROOT_INODE,
83             self.operations.inodes,
84             self.api,
85             0,
86             enable_write,
87             root_kwargs.pop('filters', None),
88             **root_kwargs,
89         ))
90         llfuse.init(self.operations, self.mounttmp, [])
91         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
92         self.llfuse_thread.daemon = True
93         self.llfuse_thread.start()
94         # wait until the driver is finished initializing
95         self.operations.initlock.wait()
96         return self.operations.inodes[llfuse.ROOT_INODE]
97
98     def tearDown(self):
99         if self.llfuse_thread:
100             if self.operations.events:
101                 self.operations.events.close(timeout=10)
102             subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
103             t0 = time.time()
104             self.llfuse_thread.join(timeout=60)
105             if self.llfuse_thread.is_alive():
106                 logger.warning("MountTestBase.tearDown():"
107                                " llfuse thread still alive 60s after umount"
108                                " -- exiting with SIGKILL")
109                 os.kill(os.getpid(), signal.SIGKILL)
110             waited = time.time() - t0
111             if waited > 0.1:
112                 logger.warning("MountTestBase.tearDown(): waited %f s for llfuse thread to end", waited)
113
114         os.rmdir(self.mounttmp)
115         if self.keeptmp:
116             shutil.rmtree(self.keeptmp)
117             os.environ.pop('KEEP_LOCAL_STORE')
118         run_test_server.reset()
119
120     def assertDirContents(self, subdir, expect_content):
121         path = self.mounttmp
122         if subdir:
123             path = os.path.join(path, subdir)
124         self.assertEqual(sorted(expect_content), sorted(llfuse.listdir(str(path))))