Merge branch '8784-dir-listings'
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
index 880158234da668ca212604252a070b8db30cddbd..d85ef552c064ac0b9f4527c3b93194ede60c142f 100644 (file)
@@ -1,4 +1,7 @@
 #!/usr/bin/env python
 #!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
 
 from __future__ import absolute_import, print_function
 
 
 from __future__ import absolute_import, print_function
 
@@ -10,14 +13,19 @@ import time
 
 import daemon
 import pykka
 
 import daemon
 import pykka
+import libcloud
 
 from . import config as nmconfig
 
 from . import config as nmconfig
+from . import status
+from .baseactor import WatchdogActor
 from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
 from .timedcallback import TimedCallBackActor
 from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
 from .timedcallback import TimedCallBackActor
+from ._version import __version__
 
 node_daemon = None
 
 node_daemon = None
+watchdog = None
 
 def abort(msg, code=1):
     print("arvados-node-manager: " + msg)
 
 def abort(msg, code=1):
     print("arvados-node-manager: " + msg)
@@ -27,6 +35,10 @@ def parse_cli(args):
     parser = argparse.ArgumentParser(
         prog='arvados-node-manager',
         description="Dynamically allocate Arvados cloud compute nodes")
     parser = argparse.ArgumentParser(
         prog='arvados-node-manager',
         description="Dynamically allocate Arvados cloud compute nodes")
+    parser.add_argument(
+        '--version', action='version',
+        version="%s %s" % (sys.argv[0], __version__),
+        help='Print version and exit.')
     parser.add_argument(
         '--foreground', action='store_true', default=False,
         help="Run in the foreground.  Don't daemonize.")
     parser.add_argument(
         '--foreground', action='store_true', default=False,
         help="Run in the foreground.  Don't daemonize.")
@@ -56,26 +68,32 @@ def setup_logging(path, level, **sublevels):
     for logger_name, sublevel in sublevels.iteritems():
         sublogger = logging.getLogger(logger_name)
         sublogger.setLevel(sublevel)
     for logger_name, sublevel in sublevels.iteritems():
         sublogger = logging.getLogger(logger_name)
         sublogger.setLevel(sublevel)
+    return root_logger
 
 def build_server_calculator(config):
     cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
     if not cloud_size_list:
         abort("No valid node sizes configured")
     return ServerCalculator(cloud_size_list,
 
 def build_server_calculator(config):
     cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
     if not cloud_size_list:
         abort("No valid node sizes configured")
     return ServerCalculator(cloud_size_list,
-                            config.getint('Daemon', 'max_nodes'))
+                            config.getint('Daemon', 'max_nodes'),
+                            config.getfloat('Daemon', 'max_total_price'),
+                            config.getfloat('Daemon', 'node_mem_scaling'))
 
 def launch_pollers(config, server_calculator):
     poll_time = config.getint('Daemon', 'poll_time')
     max_poll_time = config.getint('Daemon', 'max_poll_time')
 
 
 def launch_pollers(config, server_calculator):
     poll_time = config.getint('Daemon', 'poll_time')
     max_poll_time = config.getint('Daemon', 'max_poll_time')
 
-    timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
+    timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
     cloud_node_poller = CloudNodeListMonitorActor.start(
     cloud_node_poller = CloudNodeListMonitorActor.start(
-        config.new_cloud_client(), timer, poll_time, max_poll_time).proxy()
+        config.new_cloud_client(), timer, server_calculator, poll_time, max_poll_time).tell_proxy()
     arvados_node_poller = ArvadosNodeListMonitorActor.start(
     arvados_node_poller = ArvadosNodeListMonitorActor.start(
-        config.new_arvados_client(), timer, poll_time, max_poll_time).proxy()
+        config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
     job_queue_poller = JobQueueMonitorActor.start(
         config.new_arvados_client(), timer, server_calculator,
     job_queue_poller = JobQueueMonitorActor.start(
         config.new_arvados_client(), timer, server_calculator,
-        poll_time, max_poll_time).proxy()
+        config.getboolean('Arvados', 'jobs_queue'),
+        config.getboolean('Arvados', 'slurm_queue'),
+        poll_time, max_poll_time
+    ).tell_proxy()
     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
 
 _caught_signals = {}
     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
 
 _caught_signals = {}
@@ -86,6 +104,7 @@ def shutdown_signal(signal_code, frame):
         pykka.ActorRegistry.stop_all()
         sys.exit(-signal_code)
     elif current_count == 0:
         pykka.ActorRegistry.stop_all()
         sys.exit(-signal_code)
     elif current_count == 0:
+        watchdog.stop()
         node_daemon.shutdown()
     elif current_count == 1:
         pykka.ActorRegistry.stop_all()
         node_daemon.shutdown()
     elif current_count == 1:
         pykka.ActorRegistry.stop_all()
@@ -93,7 +112,7 @@ def shutdown_signal(signal_code, frame):
         sys.exit(-signal_code)
 
 def main(args=None):
         sys.exit(-signal_code)
 
 def main(args=None):
-    global node_daemon
+    global node_daemon, watchdog
     args = parse_cli(args)
     config = load_config(args.config)
 
     args = parse_cli(args)
     config = load_config(args.config)
 
@@ -102,31 +121,45 @@ def main(args=None):
     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
         signal.signal(sigcode, shutdown_signal)
 
     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
         signal.signal(sigcode, shutdown_signal)
 
-    setup_logging(config.get('Logging', 'file'), **config.log_levels())
-    node_setup, node_shutdown, node_update, node_monitor = \
-        config.dispatch_classes()
-    server_calculator = build_server_calculator(config)
-    timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
-        launch_pollers(config, server_calculator)
-    cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
-    node_daemon = NodeManagerDaemonActor.start(
-        job_queue_poller, arvados_node_poller, cloud_node_poller,
-        cloud_node_updater, timer,
-        config.new_arvados_client, config.new_cloud_client,
-        config.shutdown_windows(),
-        server_calculator.cheapest_size(),
-        config.getint('Daemon', 'min_nodes'),
-        config.getint('Daemon', 'max_nodes'),
-        config.getint('Daemon', 'poll_stale_after'),
-        config.getint('Daemon', 'boot_fail_after'),
-        config.getint('Daemon', 'node_stale_after'),
-        node_setup, node_shutdown, node_monitor).proxy()
-
-    signal.pause()
-    daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
-    while not daemon_stopped():
-        time.sleep(1)
-    pykka.ActorRegistry.stop_all()
+    status.Server(config).start()
+
+    try:
+        root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
+        root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
+        node_setup, node_shutdown, node_update, node_monitor = \
+            config.dispatch_classes()
+        server_calculator = build_server_calculator(config)
+        timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
+            launch_pollers(config, server_calculator)
+        cloud_node_updater = node_update.start(config.new_cloud_client, timer).tell_proxy()
+        node_daemon = NodeManagerDaemonActor.start(
+            job_queue_poller, arvados_node_poller, cloud_node_poller,
+            cloud_node_updater, timer,
+            config.new_arvados_client, config.new_cloud_client,
+            config.shutdown_windows(),
+            server_calculator,
+            config.getint('Daemon', 'min_nodes'),
+            config.getint('Daemon', 'max_nodes'),
+            config.getint('Daemon', 'poll_stale_after'),
+            config.getint('Daemon', 'boot_fail_after'),
+            config.getint('Daemon', 'node_stale_after'),
+            node_setup, node_shutdown, node_monitor,
+            max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
+
+        watchdog = WatchdogActor.start(config.getint('Daemon', 'watchdog'),
+                            cloud_node_poller.actor_ref,
+                            arvados_node_poller.actor_ref,
+                            job_queue_poller.actor_ref,
+                            node_daemon.actor_ref)
+
+        signal.pause()
+        daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
+        while not daemon_stopped():
+            time.sleep(1)
+    except Exception:
+        logging.exception("Uncaught exception during setup")
+    finally:
+        pykka.ActorRegistry.stop_all()
 
 
 if __name__ == '__main__':
 
 
 if __name__ == '__main__':