1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
9 from integration_test import IntegrationTest
11 class UnmountTest(IntegrationTest):
13 super(UnmountTest, self).setUp()
18 for d in self.to_delete:
20 super(UnmountTest, self).tearDown()
22 def test_replace(self):
23 subprocess.check_call(
24 ['./bin/arv-mount', '--subtype', 'test', '--replace',
26 subprocess.check_call(
27 ['./bin/arv-mount', '--subtype', 'test', '--replace',
28 '--unmount-timeout', '10',
30 subprocess.check_call(
31 ['./bin/arv-mount', '--subtype', 'test', '--replace',
32 '--unmount-timeout', '10',
35 for m in subprocess.check_output(['mount']).splitlines():
36 self.assertNotIn(' '+self.mnt+' ', m)
38 def _mounted(self, mounts):
39 all_mounts = subprocess.check_output(['mount'])
40 return [m for m in mounts
41 if ' '+m+' ' in all_mounts]
43 def _wait_for_mounts(self, mounts):
44 deadline = time.time() + 10
45 while self._mounted(mounts) != mounts:
47 self.assertLess(time.time(), deadline)
49 def test_unmount_subtype(self):
51 for d in ['foo', 'bar']:
54 self.to_delete.insert(0, mnt)
56 subprocess.check_call(
57 ['./bin/arv-mount', '--subtype', d, mnt])
59 self._wait_for_mounts(mounts)
60 self.assertEqual(mounts, self._mounted(mounts))
61 subprocess.call(['./bin/arv-mount', '--subtype', 'baz', '--unmount-all', self.tmp])
62 self.assertEqual(mounts, self._mounted(mounts))
63 subprocess.call(['./bin/arv-mount', '--subtype', 'bar', '--unmount', mounts[0]])
64 self.assertEqual(mounts, self._mounted(mounts))
65 subprocess.call(['./bin/arv-mount', '--subtype', '', '--unmount', self.tmp])
66 self.assertEqual(mounts, self._mounted(mounts))
67 subprocess.check_call(['./bin/arv-mount', '--subtype', 'foo', '--unmount', mounts[0]])
68 self.assertEqual(mounts[1:], self._mounted(mounts))
69 subprocess.check_call(['./bin/arv-mount', '--subtype', '', '--unmount-all', mounts[0]])
70 self.assertEqual(mounts[1:], self._mounted(mounts))
71 subprocess.check_call(['./bin/arv-mount', '--subtype', 'bar', '--unmount-all', self.tmp])
72 self.assertEqual([], self._mounted(mounts))
74 def test_unmount_children(self):
75 for d in ['foo', 'foo/bar', 'bar']:
78 self.to_delete.insert(0, mnt)
80 for d in ['bar', 'foo/bar']:
83 subprocess.check_call(
84 ['./bin/arv-mount', '--subtype', 'test', mnt])
86 self._wait_for_mounts(mounts)
87 self.assertEqual(mounts, self._mounted(mounts))
88 subprocess.check_call(['./bin/arv-mount', '--unmount', self.tmp])
89 self.assertEqual(mounts, self._mounted(mounts))
90 subprocess.check_call(['./bin/arv-mount', '--unmount-all', self.tmp])
91 self.assertEqual([], self._mounted(mounts))