Merge branch 'main' from arvados-workbench2.git
[arvados.git] / services / fuse / tests / test_unmount.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import absolute_import
6 from builtins import bytes
7 import arvados_fuse.unmount
8 import os
9 import subprocess
10 import shutil
11 import tempfile
12 import time
13 import unittest
14
15 from .integration_test import IntegrationTest
16
17 class UnmountTest(IntegrationTest):
18     def setUp(self):
19         super(UnmountTest, self).setUp()
20         self.tmp = self.mnt
21         self.to_delete = []
22
23     def tearDown(self):
24         for d in self.to_delete:
25             os.rmdir(d)
26         super(UnmountTest, self).tearDown()
27
28     def test_replace(self):
29         subprocess.check_call(
30             ['./bin/arv-mount', '--subtype', 'test', '--replace',
31              self.mnt])
32         subprocess.check_call(
33             ['./bin/arv-mount', '--subtype', 'test', '--replace',
34              '--unmount-timeout', '10',
35              self.mnt])
36         subprocess.check_call(
37             ['./bin/arv-mount', '--subtype', 'test', '--replace',
38              '--unmount-timeout', '10',
39              self.mnt,
40              '--exec', 'true'])
41         for m in subprocess.check_output(['mount']).splitlines():
42             expected = bytes(' ' + self.mnt + ' ', encoding='utf-8')
43             self.assertNotIn(expected, m)
44
45     def _mounted(self, mounts):
46         all_mounts = subprocess.check_output(['mount'])
47         return [m for m in mounts
48                 if bytes(' ' + m + ' ', encoding='utf-8') in all_mounts]
49
50     def _wait_for_mounts(self, mounts):
51         deadline = time.time() + 10
52         while self._mounted(mounts) != mounts:
53             time.sleep(0.1)
54             self.assertLess(time.time(), deadline)
55
56     def test_unmount_subtype(self):
57         mounts = []
58         for d in ['foo', 'bar']:
59             mnt = self.tmp+'/'+d
60             os.mkdir(mnt)
61             self.to_delete.insert(0, mnt)
62             mounts.append(mnt)
63             subprocess.check_call(
64                 ['./bin/arv-mount', '--subtype', d, mnt])
65
66         self._wait_for_mounts(mounts)
67         self.assertEqual(mounts, self._mounted(mounts))
68         subprocess.call(['./bin/arv-mount', '--subtype', 'baz', '--unmount-all', self.tmp])
69         self.assertEqual(mounts, self._mounted(mounts))
70         subprocess.call(['./bin/arv-mount', '--subtype', 'bar', '--unmount', mounts[0]])
71         self.assertEqual(mounts, self._mounted(mounts))
72         subprocess.call(['./bin/arv-mount', '--subtype', '', '--unmount', self.tmp])
73         self.assertEqual(mounts, self._mounted(mounts))
74         subprocess.check_call(['./bin/arv-mount', '--subtype', 'foo', '--unmount', mounts[0]])
75         self.assertEqual(mounts[1:], self._mounted(mounts))
76         subprocess.check_call(['./bin/arv-mount', '--subtype', '', '--unmount-all', mounts[0]])
77         self.assertEqual(mounts[1:], self._mounted(mounts))
78         subprocess.check_call(['./bin/arv-mount', '--subtype', 'bar', '--unmount-all', self.tmp])
79         self.assertEqual([], self._mounted(mounts))
80
81     def test_unmount_children(self):
82         for d in ['foo', 'foo/bar', 'bar']:
83             mnt = self.tmp+'/'+d
84             os.mkdir(mnt)
85             self.to_delete.insert(0, mnt)
86         mounts = []
87         for d in ['bar', 'foo/bar']:
88             mnt = self.tmp+'/'+d
89             mounts.append(mnt)
90             subprocess.check_call(
91                 ['./bin/arv-mount', '--subtype', 'test', mnt])
92
93         self._wait_for_mounts(mounts)
94         self.assertEqual(mounts, self._mounted(mounts))
95         subprocess.check_call(['./bin/arv-mount', '--unmount', self.tmp])
96         self.assertEqual(mounts, self._mounted(mounts))
97         subprocess.check_call(['./bin/arv-mount', '--unmount-all', self.tmp])
98         self.assertEqual([], self._mounted(mounts))
99
100
101
102 class SaferRealpath(unittest.TestCase):
103     def setUp(self):
104         self.tmp = tempfile.mkdtemp()
105
106     def tearDown(self):
107         shutil.rmtree(self.tmp)
108
109     def test_safer_realpath(self):
110         os.mkdir(self.tmp+"/dir")
111         os.mkdir(self.tmp+"/dir/dir2")
112         os.symlink("missing", self.tmp+"/relative-missing")
113         os.symlink("dir", self.tmp+"/./relative-dir")
114         os.symlink("relative-dir", self.tmp+"/relative-indirect")
115         os.symlink(self.tmp+"/dir", self.tmp+"/absolute-dir")
116         os.symlink("./dir/../loop", self.tmp+"/loop")
117         os.symlink(".", self.tmp+"/dir/self")
118         os.symlink("..", self.tmp+"/dir/dir2/parent")
119         os.symlink("../dir3", self.tmp+"/dir/dir2/sibling")
120         os.symlink("../missing/../danger", self.tmp+"/dir/tricky")
121         os.symlink("/proc/1/fd/12345", self.tmp+"/eperm")
122         for (inpath, outpath, ok) in [
123                 ("dir/self", "dir", True),
124                 ("dir/dir2/parent", "dir", True),
125                 ("dir/dir2/sibling", "dir/dir3", False),
126                 ("dir", "dir", True),
127                 ("relative-dir", "dir", True),
128                 ("relative-missing", "missing", False),
129                 ("relative-indirect", "dir", True),
130                 ("absolute-dir", "dir", True),
131                 ("loop", "loop", False),
132                 # "missing" doesn't exist, so "missing/.." isn't our
133                 # tmpdir; it's important not to contract this to just
134                 # "danger".
135                 ("dir/tricky", "missing/../danger", False),
136                 ("eperm", "/proc/1/fd/12345", False),
137         ]:
138             if not outpath.startswith('/'):
139                 outpath = self.tmp + '/' + outpath
140             self.assertEqual((outpath, ok), arvados_fuse.unmount.safer_realpath(self.tmp+"/"+inpath))