21541: Code cleanups
[arvados.git] / services / fuse / tests / mount_test_base.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import absolute_import
6 import arvados
7 import arvados.keep
8 import arvados_fuse as fuse
9 import arvados.safeapi
10 import llfuse
11 import logging
12 import multiprocessing
13 import os
14 from . import run_test_server
15 import shutil
16 import signal
17 import subprocess
18 import sys
19 import tempfile
20 import threading
21 import time
22 import unittest
23
24 logger = logging.getLogger('arvados.arv-mount')
25
26 from .integration_test import workerPool
27
28 def make_block_cache(disk_cache):
29     if disk_cache:
30         disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
31         shutil.rmtree(disk_cache_dir, ignore_errors=True)
32     block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
33     return block_cache
34
35 class MountTestBase(unittest.TestCase):
36     disk_cache = False
37
38     def setUp(self, api=None, local_store=True):
39         # The underlying C implementation of open() makes a fstat() syscall
40         # with the GIL still held.  When the GETATTR message comes back to
41         # llfuse (which in these tests is in the same interpreter process) it
42         # can't acquire the GIL, so it can't service the fstat() call, so it
43         # deadlocks.  The workaround is to run some of our test code in a
44         # separate process.  Forturnately the multiprocessing module makes this
45         # relatively easy.
46
47         self.pool = workerPool()
48         if local_store:
49             self.keeptmp = tempfile.mkdtemp()
50             os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
51         else:
52             self.keeptmp = None
53         self.mounttmp = tempfile.mkdtemp()
54         run_test_server.run()
55         run_test_server.authorize_with("admin")
56
57         self.api = api if api else arvados.safeapi.ThreadSafeApiCache(
58             arvados.config.settings(),
59             keep_params={"block_cache": make_block_cache(self.disk_cache)},
60             version='v1',
61         )
62         self.llfuse_thread = None
63
64     # This is a copy of Mount's method.  TODO: Refactor MountTestBase
65     # to use a Mount instead of copying its code.
66     def _llfuse_main(self):
67         try:
68             llfuse.main()
69         except:
70             llfuse.close(unmount=False)
71             raise
72         llfuse.close()
73
74     def make_mount(self, root_class, **root_kwargs):
75         enable_write = root_kwargs.pop('enable_write', True)
76         self.operations = fuse.Operations(
77             os.getuid(),
78             os.getgid(),
79             api_client=self.api,
80             enable_write=enable_write,
81         )
82         self.operations.inodes.add_entry(root_class(
83             llfuse.ROOT_INODE,
84             self.operations.inodes,
85             self.api,
86             0,
87             enable_write,
88             root_kwargs.pop('filters', None),
89             **root_kwargs,
90         ))
91         llfuse.init(self.operations, self.mounttmp, [])
92         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
93         self.llfuse_thread.daemon = True
94         self.llfuse_thread.start()
95         # wait until the driver is finished initializing
96         self.operations.initlock.wait()
97         return self.operations.inodes[llfuse.ROOT_INODE]
98
99     def tearDown(self):
100         if self.llfuse_thread:
101             if self.operations.events:
102                 self.operations.events.close(timeout=10)
103             subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
104             t0 = time.time()
105             self.llfuse_thread.join(timeout=60)
106             if self.llfuse_thread.is_alive():
107                 logger.warning("MountTestBase.tearDown():"
108                                " llfuse thread still alive 60s after umount"
109                                " -- exiting with SIGKILL")
110                 os.kill(os.getpid(), signal.SIGKILL)
111             waited = time.time() - t0
112             if waited > 0.1:
113                 logger.warning("MountTestBase.tearDown(): waited %f s for llfuse thread to end", waited)
114
115         os.rmdir(self.mounttmp)
116         if self.keeptmp:
117             shutil.rmtree(self.keeptmp)
118             os.environ.pop('KEEP_LOCAL_STORE')
119         run_test_server.reset()
120
121     def assertDirContents(self, subdir, expect_content):
122         path = self.mounttmp
123         if subdir:
124             path = os.path.join(path, subdir)
125         self.assertEqual(sorted(expect_content), sorted(llfuse.listdir(str(path))))