8288: Give fusermount -u a chance to work before resorting to operations.destroy().
authorTom Clegg <tom@curoverse.com>
Mon, 1 Feb 2016 02:43:30 +0000 (21:43 -0500)
committerTom Clegg <tom@curoverse.com>
Wed, 3 Feb 2016 17:51:38 +0000 (12:51 -0500)
Log a warning when resorting to operations.destroy().

De-duplicate setup/teardown code so more of the --exec code path is exercised in tests.

services/fuse/arvados_fuse/command.py
services/fuse/tests/integration_test.py
services/fuse/tests/mount_test_base.py

index 71623a5f3d07364bb4de94c42023af3b5658fc55..ae0c920966f83988da35844f30fef5af8ac9c977 100644 (file)
@@ -82,6 +82,10 @@ class ArgumentParser(argparse.ArgumentParser):
 
         self.add_argument('--crunchstat-interval', type=float, help="Write stats to stderr every N seconds (default disabled)", default=0)
 
+        self.add_argument('--unmount-timeout',
+                          type=float, default=2.0,
+                          help="Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted")
+
         self.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
                             dest="exec_args", metavar=('command', 'args', '...', '--'),
                             help="""Mount, run a command, then unmount and exit""")
@@ -108,13 +112,20 @@ class Mount(object):
         llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
         if self.args.mode != 'by_pdh':
             self.operations.listen_for_events()
-        t = threading.Thread(None, lambda: llfuse.main())
-        t.start()
+        self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
+        self.llfuse_thread.daemon = True
+        self.llfuse_thread.start()
         self.operations.initlock.wait()
 
     def __exit__(self, exc_type, exc_value, traceback):
         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
-        self.operations.destroy()
+        self.llfuse_thread.join(timeout=self.args.unmount_timeout)
+        if self.llfuse_thread.is_alive():
+            self.logger.warning("Mount.__exit__:"
+                                " llfuse thread still alive %fs after umount"
+                                " -- resorting to operations.destroy()",
+                                self.args.unmount_timeout)
+            self.operations.destroy()
 
     def run(self):
         if self.args.exec_args:
@@ -277,63 +288,56 @@ From here, the following directories are available:
 '''.format(api_host, user_email)
 
     def _run_exec(self):
-        # Initialize the fuse connection
-        llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
-
-        # Subscribe to change events from API server
-        if self.args.mode != 'by_pdh':
-            self.operations.listen_for_events()
-
-        t = threading.Thread(None, lambda: llfuse.main())
-        t.start()
-
-        # wait until the driver is finished initializing
-        self.operations.initlock.wait()
-
         rc = 255
-        try:
-            sp = subprocess.Popen(self.args.exec_args, shell=False)
-
-            # forward signals to the process.
-            signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
-            signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
-            signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
-
-            # wait for process to complete.
-            rc = sp.wait()
-
-            # restore default signal handlers.
-            signal.signal(signal.SIGINT, signal.SIG_DFL)
-            signal.signal(signal.SIGTERM, signal.SIG_DFL)
-            signal.signal(signal.SIGQUIT, signal.SIG_DFL)
-        except Exception as e:
-            self.logger.exception(
-                'arv-mount: exception during exec %s', self.args.exec_args)
+        with self:
             try:
-                rc = e.errno
-            except AttributeError:
-                pass
-        finally:
-            subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
-            self.operations.destroy()
+                sp = subprocess.Popen(self.args.exec_args, shell=False)
+
+                # forward signals to the process.
+                signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
+                signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
+                signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
+
+                # wait for process to complete.
+                rc = sp.wait()
+
+                # restore default signal handlers.
+                signal.signal(signal.SIGINT, signal.SIG_DFL)
+                signal.signal(signal.SIGTERM, signal.SIG_DFL)
+                signal.signal(signal.SIGQUIT, signal.SIG_DFL)
+            except Exception as e:
+                self.logger.exception(
+                    'arv-mount: exception during exec %s', self.args.exec_args)
+                try:
+                    rc = e.errno
+                except AttributeError:
+                    pass
         exit(rc)
 
     def _run_standalone(self):
         try:
             llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
 
-            if not (self.args.exec_args or self.args.foreground):
-                self.daemon_ctx = daemon.DaemonContext(working_directory=os.path.dirname(self.args.mountpoint),
-                                                       files_preserve=range(3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
+            if not self.args.foreground:
+                self.daemon_ctx = daemon.DaemonContext(
+                    working_directory=os.path.dirname(self.args.mountpoint),
+                    files_preserve=range(
+                        3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
                 self.daemon_ctx.open()
 
             # Subscribe to change events from API server
             self.operations.listen_for_events()
 
-            llfuse.main()
+            self._llfuse_main()
         except Exception as e:
             self.logger.exception('arv-mount: exception during mount: %s', e)
             exit(getattr(e, 'errno', 1))
-        finally:
-            self.operations.destroy()
         exit(0)
+
+    def _llfuse_main(self):
+        try:
+            llfuse.main()
+        except:
+            llfuse.close(unmount=False)
+            raise
+        llfuse.close()
index faa4a55065945d907109b8994470ccccb2c5904d..5a45bfc103f34df2ae928ed5bcd305d100c408fa 100644 (file)
@@ -62,7 +62,9 @@ class IntegrationTest(unittest.TestCase):
             def wrapper(self, *args, **kwargs):
                 with arvados_fuse.command.Mount(
                         arvados_fuse.command.ArgumentParser().parse_args(
-                            argv + ['--foreground', self.mnt])):
+                            argv + ['--foreground',
+                                    '--unmount-timeout=0.1',
+                                    self.mnt])):
                     return func(self, *args, **kwargs)
             return wrapper
         return decorator
index 44ec1996d2121698b478e2ba25a6ae95d0bef4c7..91a492971bb2c4bc16dce534bf153c458049f3b8 100644 (file)
@@ -37,6 +37,16 @@ class MountTestBase(unittest.TestCase):
         run_test_server.authorize_with("admin")
         self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
 
+    # This is a copy of Mount's method.  TODO: Refactor MountTestBase
+    # to use a Mount instead of copying its code.
+    def _llfuse_main(self):
+        try:
+            llfuse.main()
+        except:
+            llfuse.close(unmount=False)
+            raise
+        llfuse.close()
+
     def make_mount(self, root_class, **root_kwargs):
         self.operations = fuse.Operations(
             os.getuid(), os.getgid(),
@@ -45,7 +55,9 @@ class MountTestBase(unittest.TestCase):
         self.operations.inodes.add_entry(root_class(
             llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
         llfuse.init(self.operations, self.mounttmp, [])
-        threading.Thread(None, llfuse.main).start()
+        self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
+        self.llfuse_thread.daemon = True
+        self.llfuse_thread.start()
         # wait until the driver is finished initializing
         self.operations.initlock.wait()
         return self.operations.inodes[llfuse.ROOT_INODE]
@@ -55,17 +67,13 @@ class MountTestBase(unittest.TestCase):
         self.pool.join()
         del self.pool
 
-        # llfuse.close is buggy, so use fusermount instead.
-        #llfuse.close(unmount=True)
-
-        count = 0
-        success = 1
-        while (count < 9 and success != 0):
-          success = subprocess.call(["fusermount", "-u", self.mounttmp])
-          time.sleep(0.1)
-          count += 1
-
-        self.operations.destroy()
+        subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
+        self.llfuse_thread.join(timeout=0.1)
+        if self.llfuse_thread.is_alive():
+            logger.warning("MountTestBase.tearDown():"
+                           " llfuse thread still alive 100ms after umount"
+                           " -- resorting to operations.destroy()")
+            self.operations.destroy()
 
         os.rmdir(self.mounttmp)
         if self.keeptmp: