Merge branch 'master' into 11898-no-distinct
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
index 78bd2db5cc05fe9516c10e718506ef11734055db..d85ef552c064ac0b9f4527c3b93194ede60c142f 100644 (file)
@@ -1,4 +1,7 @@
 #!/usr/bin/env python
 #!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
 
 from __future__ import absolute_import, print_function
 
 
 from __future__ import absolute_import, print_function
 
@@ -10,14 +13,19 @@ import time
 
 import daemon
 import pykka
 
 import daemon
 import pykka
+import libcloud
 
 from . import config as nmconfig
 
 from . import config as nmconfig
+from . import status
+from .baseactor import WatchdogActor
 from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
 from .timedcallback import TimedCallBackActor
 from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
 from .timedcallback import TimedCallBackActor
+from ._version import __version__
 
 node_daemon = None
 
 node_daemon = None
+watchdog = None
 
 def abort(msg, code=1):
     print("arvados-node-manager: " + msg)
 
 def abort(msg, code=1):
     print("arvados-node-manager: " + msg)
@@ -27,6 +35,10 @@ def parse_cli(args):
     parser = argparse.ArgumentParser(
         prog='arvados-node-manager',
         description="Dynamically allocate Arvados cloud compute nodes")
     parser = argparse.ArgumentParser(
         prog='arvados-node-manager',
         description="Dynamically allocate Arvados cloud compute nodes")
+    parser.add_argument(
+        '--version', action='version',
+        version="%s %s" % (sys.argv[0], __version__),
+        help='Print version and exit.')
     parser.add_argument(
         '--foreground', action='store_true', default=False,
         help="Run in the foreground.  Don't daemonize.")
     parser.add_argument(
         '--foreground', action='store_true', default=False,
         help="Run in the foreground.  Don't daemonize.")
@@ -56,6 +68,7 @@ def setup_logging(path, level, **sublevels):
     for logger_name, sublevel in sublevels.iteritems():
         sublogger = logging.getLogger(logger_name)
         sublogger.setLevel(sublevel)
     for logger_name, sublevel in sublevels.iteritems():
         sublogger = logging.getLogger(logger_name)
         sublogger.setLevel(sublevel)
+    return root_logger
 
 def build_server_calculator(config):
     cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
 
 def build_server_calculator(config):
     cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
@@ -63,7 +76,8 @@ def build_server_calculator(config):
         abort("No valid node sizes configured")
     return ServerCalculator(cloud_size_list,
                             config.getint('Daemon', 'max_nodes'),
         abort("No valid node sizes configured")
     return ServerCalculator(cloud_size_list,
                             config.getint('Daemon', 'max_nodes'),
-                            config.getfloat('Daemon', 'max_total_price'))
+                            config.getfloat('Daemon', 'max_total_price'),
+                            config.getfloat('Daemon', 'node_mem_scaling'))
 
 def launch_pollers(config, server_calculator):
     poll_time = config.getint('Daemon', 'poll_time')
 
 def launch_pollers(config, server_calculator):
     poll_time = config.getint('Daemon', 'poll_time')
@@ -71,12 +85,15 @@ def launch_pollers(config, server_calculator):
 
     timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
     cloud_node_poller = CloudNodeListMonitorActor.start(
 
     timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
     cloud_node_poller = CloudNodeListMonitorActor.start(
-        config.new_cloud_client(), timer, poll_time, max_poll_time).tell_proxy()
+        config.new_cloud_client(), timer, server_calculator, poll_time, max_poll_time).tell_proxy()
     arvados_node_poller = ArvadosNodeListMonitorActor.start(
         config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
     job_queue_poller = JobQueueMonitorActor.start(
         config.new_arvados_client(), timer, server_calculator,
     arvados_node_poller = ArvadosNodeListMonitorActor.start(
         config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
     job_queue_poller = JobQueueMonitorActor.start(
         config.new_arvados_client(), timer, server_calculator,
-        poll_time, max_poll_time).tell_proxy()
+        config.getboolean('Arvados', 'jobs_queue'),
+        config.getboolean('Arvados', 'slurm_queue'),
+        poll_time, max_poll_time
+    ).tell_proxy()
     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
 
 _caught_signals = {}
     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
 
 _caught_signals = {}
@@ -87,6 +104,7 @@ def shutdown_signal(signal_code, frame):
         pykka.ActorRegistry.stop_all()
         sys.exit(-signal_code)
     elif current_count == 0:
         pykka.ActorRegistry.stop_all()
         sys.exit(-signal_code)
     elif current_count == 0:
+        watchdog.stop()
         node_daemon.shutdown()
     elif current_count == 1:
         pykka.ActorRegistry.stop_all()
         node_daemon.shutdown()
     elif current_count == 1:
         pykka.ActorRegistry.stop_all()
@@ -94,7 +112,7 @@ def shutdown_signal(signal_code, frame):
         sys.exit(-signal_code)
 
 def main(args=None):
         sys.exit(-signal_code)
 
 def main(args=None):
-    global node_daemon
+    global node_daemon, watchdog
     args = parse_cli(args)
     config = load_config(args.config)
 
     args = parse_cli(args)
     config = load_config(args.config)
 
@@ -103,14 +121,17 @@ def main(args=None):
     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
         signal.signal(sigcode, shutdown_signal)
 
     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
         signal.signal(sigcode, shutdown_signal)
 
+    status.Server(config).start()
+
     try:
     try:
-        setup_logging(config.get('Logging', 'file'), **config.log_levels())
+        root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
+        root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
         node_setup, node_shutdown, node_update, node_monitor = \
             config.dispatch_classes()
         server_calculator = build_server_calculator(config)
         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
             launch_pollers(config, server_calculator)
         node_setup, node_shutdown, node_update, node_monitor = \
             config.dispatch_classes()
         server_calculator = build_server_calculator(config)
         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
             launch_pollers(config, server_calculator)
-        cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
+        cloud_node_updater = node_update.start(config.new_cloud_client, timer).tell_proxy()
         node_daemon = NodeManagerDaemonActor.start(
             job_queue_poller, arvados_node_poller, cloud_node_poller,
             cloud_node_updater, timer,
         node_daemon = NodeManagerDaemonActor.start(
             job_queue_poller, arvados_node_poller, cloud_node_poller,
             cloud_node_updater, timer,
@@ -125,6 +146,12 @@ def main(args=None):
             node_setup, node_shutdown, node_monitor,
             max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
 
             node_setup, node_shutdown, node_monitor,
             max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
 
+        watchdog = WatchdogActor.start(config.getint('Daemon', 'watchdog'),
+                            cloud_node_poller.actor_ref,
+                            arvados_node_poller.actor_ref,
+                            job_queue_poller.actor_ref,
+                            node_daemon.actor_ref)
+
         signal.pause()
         daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
         while not daemon_stopped():
         signal.pause()
         daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
         while not daemon_stopped():