+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
import collections
import errno
import os
return mi
-def unmount_all(path, timeout=10):
- if not path.endswith("/..."):
- return unmount(path, timeout=timeout)
- root = os.path.realpath(path[:-4])
-
- paths = []
- for m in mountinfo():
- if m.path == root or m.path.startswith(root+"/"):
- paths.append(m.path)
- if not m.is_fuse:
- raise Exception(
- "cannot unmount {}: non-fuse mountpoint {}".format(
- path, m))
- for path in sorted(paths, key=len, reverse=True):
- unmount(path, timeout=timeout)
- return len(paths) > 0
-
-
-def unmount(path, timeout=10):
+def unmount(path, subtype=None, timeout=10, recursive=False):
"""Unmount the fuse mount at path.
Unmounting is done by writing 1 to the "abort" control file in
path = os.path.realpath(path)
+ if subtype is None:
+ mnttype = None
+ elif subtype == '':
+ mnttype = 'fuse'
+ else:
+ mnttype = 'fuse.' + subtype
+
+ if recursive:
+ paths = []
+ for m in mountinfo():
+ if m.path == path or m.path.startswith(path+"/"):
+ paths.append(m.path)
+ if not (m.is_fuse and (mnttype is None or
+ mnttype == m.mnttype)):
+ raise Exception(
+ "cannot unmount {}: mount type is {}".format(
+ path, m.mnttype))
+ for path in sorted(paths, key=len, reverse=True):
+ unmount(path, timeout=timeout, recursive=False)
+ return len(paths) > 0
+
was_mounted = False
attempted = False
if timeout is None:
while True:
mounted = False
for m in mountinfo():
- if m.is_fuse:
+ if m.is_fuse and (mnttype is None or mnttype == m.mnttype):
try:
if os.path.realpath(m.path) == path:
was_mounted = True