Merge branch 'master' into 9369-arv-cwl-docs
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
index 5fa404fcbbdf4df95d435ab2a73afbc2abe7bc97..1be7e46387ff6c5bfe38d4e4805694fb7986cfa7 100644 (file)
@@ -12,6 +12,7 @@ import daemon
 import pykka
 
 from . import config as nmconfig
 import pykka
 
 from . import config as nmconfig
+from .baseactor import WatchdogActor
 from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
 from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
@@ -57,28 +58,26 @@ def setup_logging(path, level, **sublevels):
         sublogger = logging.getLogger(logger_name)
         sublogger.setLevel(sublevel)
 
         sublogger = logging.getLogger(logger_name)
         sublogger.setLevel(sublevel)
 
-def launch_pollers(config):
-    cloud_client = config.new_cloud_client()
-    arvados_client = config.new_arvados_client()
-    cloud_size_list = config.node_sizes(cloud_client.list_sizes())
+def build_server_calculator(config):
+    cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
     if not cloud_size_list:
         abort("No valid node sizes configured")
     if not cloud_size_list:
         abort("No valid node sizes configured")
+    return ServerCalculator(cloud_size_list,
+                            config.getint('Daemon', 'max_nodes'),
+                            config.getfloat('Daemon', 'max_total_price'))
 
 
-    server_calculator = ServerCalculator(
-        cloud_size_list,
-        config.getint('Daemon', 'min_nodes'),
-        config.getint('Daemon', 'max_nodes'))
+def launch_pollers(config, server_calculator):
     poll_time = config.getint('Daemon', 'poll_time')
     max_poll_time = config.getint('Daemon', 'max_poll_time')
 
     poll_time = config.getint('Daemon', 'poll_time')
     max_poll_time = config.getint('Daemon', 'max_poll_time')
 
-    timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
+    timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
     cloud_node_poller = CloudNodeListMonitorActor.start(
     cloud_node_poller = CloudNodeListMonitorActor.start(
-        cloud_client, timer, poll_time, max_poll_time).proxy()
+        config.new_cloud_client(), timer, poll_time, max_poll_time).tell_proxy()
     arvados_node_poller = ArvadosNodeListMonitorActor.start(
     arvados_node_poller = ArvadosNodeListMonitorActor.start(
-        arvados_client, timer, poll_time, max_poll_time).proxy()
+        config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
     job_queue_poller = JobQueueMonitorActor.start(
         config.new_arvados_client(), timer, server_calculator,
     job_queue_poller = JobQueueMonitorActor.start(
         config.new_arvados_client(), timer, server_calculator,
-        poll_time, max_poll_time).proxy()
+        poll_time, max_poll_time).tell_proxy()
     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
 
 _caught_signals = {}
     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
 
 _caught_signals = {}
@@ -105,29 +104,42 @@ def main(args=None):
     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
         signal.signal(sigcode, shutdown_signal)
 
     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
         signal.signal(sigcode, shutdown_signal)
 
-    setup_logging(config.get('Logging', 'file'), **config.log_levels())
-    node_setup, node_shutdown, node_update, node_monitor = \
-        config.dispatch_classes()
-    timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
-        launch_pollers(config)
-    cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
-    node_daemon = NodeManagerDaemonActor.start(
-        job_queue_poller, arvados_node_poller, cloud_node_poller,
-        cloud_node_updater, timer,
-        config.new_arvados_client, config.new_cloud_client,
-        config.shutdown_windows(),
-        config.getint('Daemon', 'min_nodes'),
-        config.getint('Daemon', 'max_nodes'),
-        config.getint('Daemon', 'poll_stale_after'),
-        config.getint('Daemon', 'boot_fail_after'),
-        config.getint('Daemon', 'node_stale_after'),
-        node_setup, node_shutdown, node_monitor).proxy()
-
-    signal.pause()
-    daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
-    while not daemon_stopped():
-        time.sleep(1)
-    pykka.ActorRegistry.stop_all()
+    try:
+        setup_logging(config.get('Logging', 'file'), **config.log_levels())
+        node_setup, node_shutdown, node_update, node_monitor = \
+            config.dispatch_classes()
+        server_calculator = build_server_calculator(config)
+        timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
+            launch_pollers(config, server_calculator)
+        cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
+        node_daemon = NodeManagerDaemonActor.start(
+            job_queue_poller, arvados_node_poller, cloud_node_poller,
+            cloud_node_updater, timer,
+            config.new_arvados_client, config.new_cloud_client,
+            config.shutdown_windows(),
+            server_calculator,
+            config.getint('Daemon', 'min_nodes'),
+            config.getint('Daemon', 'max_nodes'),
+            config.getint('Daemon', 'poll_stale_after'),
+            config.getint('Daemon', 'boot_fail_after'),
+            config.getint('Daemon', 'node_stale_after'),
+            node_setup, node_shutdown, node_monitor,
+            max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
+
+        WatchdogActor.start(config.getint('Daemon', 'watchdog'),
+                            cloud_node_poller.actor_ref,
+                            arvados_node_poller.actor_ref,
+                            job_queue_poller.actor_ref,
+                            node_daemon.actor_ref)
+
+        signal.pause()
+        daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
+        while not daemon_stopped():
+            time.sleep(1)
+    except Exception:
+        logging.exception("Uncaught exception during setup")
+    finally:
+        pykka.ActorRegistry.stop_all()
 
 
 if __name__ == '__main__':
 
 
 if __name__ == '__main__':