11807: Migrate old records in jobs table from YAML to JSON.
[arvados.git] / services / nodemanager / arvnodeman / baseactor.py
index 9591b42ca3e3a0f38060222124fabccecc7fe4a9..68ea97ab75aac1b71aca36427165ed54cef97cc2 100644 (file)
@@ -3,6 +3,8 @@ from __future__ import absolute_import, print_function
 import errno
 import logging
 import os
+import signal
+import time
 import threading
 import traceback
 
@@ -82,4 +84,35 @@ class BaseNodeManagerActor(pykka.ThreadingActor):
         if (exception_type in (threading.ThreadError, MemoryError) or
             exception_type is OSError and exception_value.errno == errno.ENOMEM):
             lg.critical("Unhandled exception is a fatal error, killing Node Manager")
-            os.killpg(os.getpgid(0), 9)
+            os.kill(os.getpid(), signal.SIGKILL)
+
+    def ping(self):
+        return True
+
+
+class WatchdogActor(pykka.ThreadingActor):
+    def __init__(self, timeout, *args, **kwargs):
+         super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
+         self.timeout = timeout
+         self.actors = [a.proxy() for a in args]
+         self.actor_ref = TellableActorRef(self)
+         self._later = self.actor_ref.tell_proxy()
+
+    def kill_self(self, e, act):
+        lg = getattr(self, "_logger", logging)
+        lg.critical("Watchdog exception", exc_info=e)
+        lg.critical("Actor %s watchdog ping time out, killing Node Manager", act)
+        os.kill(os.getpid(), signal.SIGKILL)
+
+    def on_start(self):
+        self._later.run()
+
+    def run(self):
+        a = None
+        try:
+            for a in self.actors:
+                a.ping().get(self.timeout)
+            time.sleep(20)
+            self._later.run()
+        except Exception as e:
+            self.kill_self(e, a)