import errno
import logging
import os
+import signal
+import time
import threading
import traceback
if (exception_type in (threading.ThreadError, MemoryError) or
exception_type is OSError and exception_value.errno == errno.ENOMEM):
lg.critical("Unhandled exception is a fatal error, killing Node Manager")
- os.killpg(os.getpgid(0), 9)
+ os.kill(os.getpid(), signal.SIGKILL)
+
+ def ping(self):
+ return True
+
+
+class WatchdogActor(pykka.ThreadingActor):
+ def __init__(self, timeout, *args, **kwargs):
+ super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
+ self.timeout = timeout
+ self.actors = [a.proxy() for a in args]
+ self.actor_ref = TellableActorRef(self)
+ self._later = self.actor_ref.tell_proxy()
+
+ def kill_self(self, e, act):
+ lg = getattr(self, "_logger", logging)
+ lg.critical("Watchdog exception", exc_info=e)
+ lg.critical("Actor %s watchdog ping time out, killing Node Manager", act)
+ os.kill(os.getpid(), signal.SIGKILL)
+
+ def on_start(self):
+ self._later.run()
+
+ def run(self):
+ a = None
+ try:
+ for a in self.actors:
+ a.ping().get(self.timeout)
+ time.sleep(20)
+ self._later.run()
+ except Exception as e:
+ self.kill_self(e, a)