11644: Use filesystem UUID and path as DeviceID for local disk volumes.
[arvados.git] / services / fuse / tests / test_unmount.py
1 import os
2 import subprocess
3 import time
4
5 from integration_test import IntegrationTest
6
7 class UnmountTest(IntegrationTest):
8     def setUp(self):
9         super(UnmountTest, self).setUp()
10         self.tmp = self.mnt
11         self.to_delete = []
12
13     def tearDown(self):
14         for d in self.to_delete:
15             os.rmdir(d)
16         super(UnmountTest, self).tearDown()
17
18     def test_replace(self):
19         subprocess.check_call(
20             ['./bin/arv-mount', '--subtype', 'test', '--replace',
21              self.mnt])
22         subprocess.check_call(
23             ['./bin/arv-mount', '--subtype', 'test', '--replace',
24              '--unmount-timeout', '10',
25              self.mnt])
26         subprocess.check_call(
27             ['./bin/arv-mount', '--subtype', 'test', '--replace',
28              '--unmount-timeout', '10',
29              self.mnt,
30              '--exec', 'true'])
31         for m in subprocess.check_output(['mount']).splitlines():
32             self.assertNotIn(' '+self.mnt+' ', m)
33
34     def _mounted(self, mounts):
35         all_mounts = subprocess.check_output(['mount'])
36         return [m for m in mounts
37                 if ' '+m+' ' in all_mounts]
38
39     def _wait_for_mounts(self, mounts):
40         deadline = time.time() + 10
41         while self._mounted(mounts) != mounts:
42             time.sleep(0.1)
43             self.assertLess(time.time(), deadline)
44
45     def test_unmount_subtype(self):
46         mounts = []
47         for d in ['foo', 'bar']:
48             mnt = self.tmp+'/'+d
49             os.mkdir(mnt)
50             self.to_delete.insert(0, mnt)
51             mounts.append(mnt)
52             subprocess.check_call(
53                 ['./bin/arv-mount', '--subtype', d, mnt])
54
55         self._wait_for_mounts(mounts)
56         self.assertEqual(mounts, self._mounted(mounts))
57         subprocess.call(['./bin/arv-mount', '--subtype', 'baz', '--unmount-all', self.tmp])
58         self.assertEqual(mounts, self._mounted(mounts))
59         subprocess.call(['./bin/arv-mount', '--subtype', 'bar', '--unmount', mounts[0]])
60         self.assertEqual(mounts, self._mounted(mounts))
61         subprocess.call(['./bin/arv-mount', '--subtype', '', '--unmount', self.tmp])
62         self.assertEqual(mounts, self._mounted(mounts))
63         subprocess.check_call(['./bin/arv-mount', '--subtype', 'foo', '--unmount', mounts[0]])
64         self.assertEqual(mounts[1:], self._mounted(mounts))
65         subprocess.check_call(['./bin/arv-mount', '--subtype', '', '--unmount-all', mounts[0]])
66         self.assertEqual(mounts[1:], self._mounted(mounts))
67         subprocess.check_call(['./bin/arv-mount', '--subtype', 'bar', '--unmount-all', self.tmp])
68         self.assertEqual([], self._mounted(mounts))
69
70     def test_unmount_children(self):
71         for d in ['foo', 'foo/bar', 'bar']:
72             mnt = self.tmp+'/'+d
73             os.mkdir(mnt)
74             self.to_delete.insert(0, mnt)
75         mounts = []
76         for d in ['bar', 'foo/bar']:
77             mnt = self.tmp+'/'+d
78             mounts.append(mnt)
79             subprocess.check_call(
80                 ['./bin/arv-mount', '--subtype', 'test', mnt])
81
82         self._wait_for_mounts(mounts)
83         self.assertEqual(mounts, self._mounted(mounts))
84         subprocess.check_call(['./bin/arv-mount', '--unmount', self.tmp])
85         self.assertEqual(mounts, self._mounted(mounts))
86         subprocess.check_call(['./bin/arv-mount', '--unmount-all', self.tmp])
87         self.assertEqual([], self._mounted(mounts))