20831: Fix typo
[arvados.git] / services / fuse / tests / mount_test_base.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import absolute_import
6 import arvados
7 import arvados.keep
8 import arvados_fuse as fuse
9 import arvados.safeapi
10 import llfuse
11 import logging
12 import multiprocessing
13 import os
14 from . import run_test_server
15 import shutil
16 import signal
17 import subprocess
18 import sys
19 import tempfile
20 import threading
21 import time
22 import unittest
23
24 logger = logging.getLogger('arvados.arv-mount')
25
26 from .integration_test import workerPool
27
28 def make_block_cache(disk_cache):
29     if disk_cache:
30         disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
31         shutil.rmtree(disk_cache_dir, ignore_errors=True)
32     block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
33     return block_cache
34
35 class MountTestBase(unittest.TestCase):
36     disk_cache = False
37
38     def setUp(self, api=None, local_store=True):
39         # The underlying C implementation of open() makes a fstat() syscall
40         # with the GIL still held.  When the GETATTR message comes back to
41         # llfuse (which in these tests is in the same interpreter process) it
42         # can't acquire the GIL, so it can't service the fstat() call, so it
43         # deadlocks.  The workaround is to run some of our test code in a
44         # separate process.  Forturnately the multiprocessing module makes this
45         # relatively easy.
46
47         self.pool = workerPool()
48         if local_store:
49             self.keeptmp = tempfile.mkdtemp()
50             os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
51         else:
52             self.keeptmp = None
53         self.mounttmp = tempfile.mkdtemp()
54         run_test_server.run()
55         run_test_server.authorize_with("admin")
56
57         self.api = api if api else arvados.safeapi.ThreadSafeApiCache(
58             arvados.config.settings(),
59             keep_params={"block_cache": make_block_cache(self.disk_cache)},
60             version='v1',
61         )
62         self.llfuse_thread = None
63
64     # This is a copy of Mount's method.  TODO: Refactor MountTestBase
65     # to use a Mount instead of copying its code.
66     def _llfuse_main(self):
67         try:
68             llfuse.main()
69         except:
70             llfuse.close(unmount=False)
71             raise
72         llfuse.close()
73
74     def make_mount(self, root_class, **root_kwargs):
75         enable_write = True
76         if 'enable_write' in root_kwargs:
77             enable_write = root_kwargs.pop('enable_write')
78         self.operations = fuse.Operations(
79             os.getuid(), os.getgid(),
80             api_client=self.api,
81             enable_write=enable_write)
82         self.operations.inodes.add_entry(root_class(
83             llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, enable_write, **root_kwargs))
84         llfuse.init(self.operations, self.mounttmp, [])
85         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
86         self.llfuse_thread.daemon = True
87         self.llfuse_thread.start()
88         # wait until the driver is finished initializing
89         self.operations.initlock.wait()
90         return self.operations.inodes[llfuse.ROOT_INODE]
91
92     def tearDown(self):
93         if self.llfuse_thread:
94             if self.operations.events:
95                 self.operations.events.close(timeout=10)
96             subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
97             t0 = time.time()
98             self.llfuse_thread.join(timeout=10)
99             if self.llfuse_thread.is_alive():
100                 logger.warning("MountTestBase.tearDown():"
101                                " llfuse thread still alive 10s after umount"
102                                " -- exiting with SIGKILL")
103                 os.kill(os.getpid(), signal.SIGKILL)
104             waited = time.time() - t0
105             if waited > 0.1:
106                 logger.warning("MountTestBase.tearDown(): waited %f s for llfuse thread to end", waited)
107
108         os.rmdir(self.mounttmp)
109         if self.keeptmp:
110             shutil.rmtree(self.keeptmp)
111             os.environ.pop('KEEP_LOCAL_STORE')
112         run_test_server.reset()
113
114     def assertDirContents(self, subdir, expect_content):
115         path = self.mounttmp
116         if subdir:
117             path = os.path.join(path, subdir)
118         self.assertEqual(sorted(expect_content), sorted(llfuse.listdir(str(path))))