11925: Explicitly join actor thread after stopping.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 3 Aug 2017 18:57:41 +0000 (14:57 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 3 Aug 2017 18:57:41 +0000 (14:57 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curoverse.com>

services/nodemanager/arvnodeman/baseactor.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_failure.py
services/nodemanager/tests/testutil.py

index 0142db2dc3e3c173a45d58ea236b03c79610832a..565db6601f18e68f5f621e0838ee06e051038028 100644 (file)
@@ -82,17 +82,20 @@ class BaseNodeManagerActor(pykka.ThreadingActor):
     def __init__(self, *args, **kwargs):
          super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
          self.actor_ref = TellableActorRef(self)
+         self._killfunc = kwargs.get("killfunc", os.kill)
 
     def on_failure(self, exception_type, exception_value, tb):
         lg = getattr(self, "_logger", logging)
         if (exception_type in (threading.ThreadError, MemoryError) or
             exception_type is OSError and exception_value.errno == errno.ENOMEM):
             lg.critical("Unhandled exception is a fatal error, killing Node Manager")
-            os.kill(os.getpid(), signal.SIGKILL)
+            self._killfunc(os.getpid(), signal.SIGKILL)
 
     def ping(self):
         return True
 
+    def get_thread(self):
+        return threading.current_thread()
 
 class WatchdogActor(pykka.ThreadingActor):
     def __init__(self, timeout, *args, **kwargs):
index 3c5fee8566880c6705d70c3f9859a827fd9ee44f..3408d292e2b53294cb9bb3ae4e47ca2aaae338ec 100644 (file)
@@ -169,7 +169,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
         self.check_monitors_arvados_nodes(arv_node)
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.assertEqual(0, self.alive_monitor_count())
+        self.busywait(lambda: 0 == self.alive_monitor_count())
         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
         self.busywait(lambda: 1 == self.alive_monitor_count(),
                       lambda: self.stop_proxy(self.daemon))
@@ -204,8 +204,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                             2,
                                             last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size, size])
-        self.busywait(lambda: self.node_setup.start.called,
-                      lambda: self.stop_proxy(self.daemon))
+        time.sleep(2)
+        self.busywait(lambda: self.node_setup.start.called)
 
     def test_missing_counts_towards_max(self):
         size = testutil.MockSize(1)
@@ -525,6 +525,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
                      testutil.arvados_node_mock(4, job_uuid=None)]
         self.make_daemon(cloud_nodes, arv_nodes, [size])
+        self.daemon.ping().get(self.TIMEOUT)
         self.assertEqual(2, self.alive_monitor_count())
         for mon_ref in self.monitor_list():
             monitor = mon_ref.proxy()
@@ -697,11 +698,11 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         booting = self.daemon.booting.get()
         cloud_nodes = self.daemon.cloud_nodes.get()
 
-        self.stop_proxy(self.daemon)
-
         self.busywait(lambda: 1 == self.node_setup.start.call_count)
         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
 
+        self.stop_proxy(self.daemon)
+
         # booting a new big node
         sizecounts = {a[0].id: 0 for a in avail_sizes}
         for b in booting.itervalues():
index 03cc5df868329770ade28b386feabea17ff45fad..ef4423dafaf7762b5d8a8c95fdbaacf630156917 100644 (file)
@@ -19,8 +19,8 @@ from . import testutil
 import arvnodeman.baseactor
 
 class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
-    def __init__(self, e):
-        super(BogusActor, self).__init__()
+    def __init__(self, e, killfunc=None):
+        super(BogusActor, self).__init__(killfunc=killfunc)
         self.exp = e
 
     def doStuff(self):
@@ -35,15 +35,18 @@ class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
 class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase):
     def test_fatal_error(self):
         for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
-            with mock.patch('os.kill') as kill_mock:
-                act = BogusActor.start(e).tell_proxy()
-                act.doStuff()
-                act.actor_ref.stop(block=True)
-                self.assertTrue(kill_mock.called)
-
-    @mock.patch('os.kill')
-    def test_nonfatal_error(self, kill_mock):
-        act = BogusActor.start(OSError(errno.ENOENT, "")).tell_proxy()
+            kill_mock = mock.Mock('os.kill')
+            bgact = BogusActor.start(e, killfunc=kill_mock)
+            act_thread = bgact.proxy().get_thread().get()
+            act = bgact.tell_proxy()
+            act.doStuff()
+            act.actor_ref.stop(block=True)
+            act_thread.join()
+            self.assertTrue(kill_mock.called)
+
+    def test_nonfatal_error(self):
+        kill_mock = mock.Mock('os.kill')
+        act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
         act.doStuff()
         act.actor_ref.stop(block=True)
         self.assertFalse(kill_mock.called)
index df8c7c7d79d90e39ebe0d4ee8b9d89f319e06c15..6e134375bb8aec05bdd71f830e28f277d3cff5b5 100644 (file)
@@ -123,7 +123,10 @@ class ActorTestMixin(object):
         pykka.ActorRegistry.stop_all()
 
     def stop_proxy(self, proxy):
-        return proxy.actor_ref.stop(timeout=self.TIMEOUT)
+        th = proxy.get_thread().get()
+        t = proxy.actor_ref.stop(timeout=self.TIMEOUT)
+        th.join()
+        return t
 
     def wait_for_assignment(self, proxy, attr_name, unassigned=None,
                             timeout=TIMEOUT):