Merge branch '8931-event-thread-catch-exceptions' closes #8931
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
index 9f5e1627eaa1b1d808c87190dc1c40c2c15f5d3b..78bd2db5cc05fe9516c10e718506ef11734055db 100644 (file)
@@ -57,28 +57,26 @@ def setup_logging(path, level, **sublevels):
         sublogger = logging.getLogger(logger_name)
         sublogger.setLevel(sublevel)
 
-def launch_pollers(config):
-    cloud_client = config.new_cloud_client()
-    arvados_client = config.new_arvados_client()
-    cloud_size_list = config.node_sizes(cloud_client.list_sizes())
+def build_server_calculator(config):
+    cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
     if not cloud_size_list:
         abort("No valid node sizes configured")
+    return ServerCalculator(cloud_size_list,
+                            config.getint('Daemon', 'max_nodes'),
+                            config.getfloat('Daemon', 'max_total_price'))
 
-    server_calculator = ServerCalculator(
-        cloud_size_list,
-        config.getint('Daemon', 'min_nodes'),
-        config.getint('Daemon', 'max_nodes'))
+def launch_pollers(config, server_calculator):
     poll_time = config.getint('Daemon', 'poll_time')
     max_poll_time = config.getint('Daemon', 'max_poll_time')
 
-    timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
+    timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
     cloud_node_poller = CloudNodeListMonitorActor.start(
-        cloud_client, timer, poll_time, max_poll_time).proxy()
+        config.new_cloud_client(), timer, poll_time, max_poll_time).tell_proxy()
     arvados_node_poller = ArvadosNodeListMonitorActor.start(
-        arvados_client, timer, poll_time, max_poll_time).proxy()
+        config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
     job_queue_poller = JobQueueMonitorActor.start(
         config.new_arvados_client(), timer, server_calculator,
-        poll_time, max_poll_time).proxy()
+        poll_time, max_poll_time).tell_proxy()
     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
 
 _caught_signals = {}
@@ -105,28 +103,36 @@ def main(args=None):
     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
         signal.signal(sigcode, shutdown_signal)
 
-    setup_logging(config.get('Logging', 'file'), **config.log_levels())
-    node_setup, node_shutdown, node_update, node_monitor = \
-        config.dispatch_classes()
-    timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
-        launch_pollers(config)
-    cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
-    node_daemon = NodeManagerDaemonActor.start(
-        job_queue_poller, arvados_node_poller, cloud_node_poller,
-        cloud_node_updater, timer,
-        config.new_arvados_client, config.new_cloud_client,
-        config.shutdown_windows(),
-        config.getint('Daemon', 'min_nodes'),
-        config.getint('Daemon', 'max_nodes'),
-        config.getint('Daemon', 'poll_stale_after'),
-        config.getint('Daemon', 'node_stale_after'),
-        node_setup, node_shutdown, node_monitor).proxy()
-
-    signal.pause()
-    daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
-    while not daemon_stopped():
-        time.sleep(1)
-    pykka.ActorRegistry.stop_all()
+    try:
+        setup_logging(config.get('Logging', 'file'), **config.log_levels())
+        node_setup, node_shutdown, node_update, node_monitor = \
+            config.dispatch_classes()
+        server_calculator = build_server_calculator(config)
+        timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
+            launch_pollers(config, server_calculator)
+        cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
+        node_daemon = NodeManagerDaemonActor.start(
+            job_queue_poller, arvados_node_poller, cloud_node_poller,
+            cloud_node_updater, timer,
+            config.new_arvados_client, config.new_cloud_client,
+            config.shutdown_windows(),
+            server_calculator,
+            config.getint('Daemon', 'min_nodes'),
+            config.getint('Daemon', 'max_nodes'),
+            config.getint('Daemon', 'poll_stale_after'),
+            config.getint('Daemon', 'boot_fail_after'),
+            config.getint('Daemon', 'node_stale_after'),
+            node_setup, node_shutdown, node_monitor,
+            max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
+
+        signal.pause()
+        daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
+        while not daemon_stopped():
+            time.sleep(1)
+    except Exception:
+        logging.exception("Uncaught exception during setup")
+    finally:
+        pykka.ActorRegistry.stop_all()
 
 
 if __name__ == '__main__':