def __init__(self, *args, **kwargs):
super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
self.actor_ref = TellableActorRef(self)
+ self._killfunc = kwargs.get("killfunc", os.kill)
def on_failure(self, exception_type, exception_value, tb):
lg = getattr(self, "_logger", logging)
if (exception_type in (threading.ThreadError, MemoryError) or
exception_type is OSError and exception_value.errno == errno.ENOMEM):
lg.critical("Unhandled exception is a fatal error, killing Node Manager")
- os.kill(os.getpid(), signal.SIGKILL)
+ self._killfunc(os.getpid(), signal.SIGKILL)
def ping(self):
return True
+ def get_thread(self):
+ return threading.current_thread()
class WatchdogActor(pykka.ThreadingActor):
def __init__(self, timeout, *args, **kwargs):
self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
self.check_monitors_arvados_nodes(arv_node)
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
- self.assertEqual(0, self.alive_monitor_count())
+ self.busywait(lambda: 0 == self.alive_monitor_count())
self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
self.busywait(lambda: 1 == self.alive_monitor_count(),
lambda: self.stop_proxy(self.daemon))
2,
last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size, size])
- self.busywait(lambda: self.node_setup.start.called,
- lambda: self.stop_proxy(self.daemon))
+ time.sleep(2)
+ self.busywait(lambda: self.node_setup.start.called)
def test_missing_counts_towards_max(self):
size = testutil.MockSize(1)
arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
testutil.arvados_node_mock(4, job_uuid=None)]
self.make_daemon(cloud_nodes, arv_nodes, [size])
+ self.daemon.ping().get(self.TIMEOUT)
self.assertEqual(2, self.alive_monitor_count())
for mon_ref in self.monitor_list():
monitor = mon_ref.proxy()
booting = self.daemon.booting.get()
cloud_nodes = self.daemon.cloud_nodes.get()
- self.stop_proxy(self.daemon)
-
self.busywait(lambda: 1 == self.node_setup.start.call_count)
self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
+ self.stop_proxy(self.daemon)
+
# booting a new big node
sizecounts = {a[0].id: 0 for a in avail_sizes}
for b in booting.itervalues():
import arvnodeman.baseactor
class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
- def __init__(self, e):
- super(BogusActor, self).__init__()
+ def __init__(self, e, killfunc=None):
+ super(BogusActor, self).__init__(killfunc=killfunc)
self.exp = e
def doStuff(self):
class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase):
def test_fatal_error(self):
for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
- with mock.patch('os.kill') as kill_mock:
- act = BogusActor.start(e).tell_proxy()
- act.doStuff()
- act.actor_ref.stop(block=True)
- self.assertTrue(kill_mock.called)
-
- @mock.patch('os.kill')
- def test_nonfatal_error(self, kill_mock):
- act = BogusActor.start(OSError(errno.ENOENT, "")).tell_proxy()
+ kill_mock = mock.Mock('os.kill')
+ bgact = BogusActor.start(e, killfunc=kill_mock)
+ act_thread = bgact.proxy().get_thread().get()
+ act = bgact.tell_proxy()
+ act.doStuff()
+ act.actor_ref.stop(block=True)
+ act_thread.join()
+ self.assertTrue(kill_mock.called)
+
+ def test_nonfatal_error(self):
+ kill_mock = mock.Mock('os.kill')
+ act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
act.doStuff()
act.actor_ref.stop(block=True)
self.assertFalse(kill_mock.called)
pykka.ActorRegistry.stop_all()
def stop_proxy(self, proxy):
- return proxy.actor_ref.stop(timeout=self.TIMEOUT)
+ th = proxy.get_thread().get()
+ t = proxy.actor_ref.stop(timeout=self.TIMEOUT)
+ th.join()
+ return t
def wait_for_assignment(self, proxy, attr_name, unassigned=None,
timeout=TIMEOUT):