2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
18 from . import config as nmconfig
20 from .baseactor import WatchdogActor
21 from .daemon import NodeManagerDaemonActor
22 from .jobqueue import JobQueueMonitorActor, ServerCalculator
23 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
24 from .timedcallback import TimedCallBackActor
25 from ._version import __version__
30 def abort(msg, code=1):
31 print("arvados-node-manager: " + msg)
35 parser = argparse.ArgumentParser(
36 prog='arvados-node-manager',
37 description="Dynamically allocate Arvados cloud compute nodes")
39 '--version', action='version',
40 version="%s %s" % (sys.argv[0], __version__),
41 help='Print version and exit.')
43 '--foreground', action='store_true', default=False,
44 help="Run in the foreground. Don't daemonize.")
46 '--config', help="Path to configuration file")
47 return parser.parse_args(args)
49 def load_config(path):
51 abort("No --config file specified", 2)
52 config = nmconfig.NodeManagerConfig()
54 with open(path) as config_file:
55 config.readfp(config_file)
56 except (IOError, OSError) as error:
57 abort("Error reading configuration file {}: {}".format(path, error))
60 def setup_logging(path, level, **sublevels):
61 handler = logging.FileHandler(path)
62 handler.setFormatter(logging.Formatter(
63 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
65 root_logger = logging.getLogger()
66 root_logger.addHandler(handler)
67 root_logger.setLevel(level)
68 for logger_name, sublevel in sublevels.iteritems():
69 sublogger = logging.getLogger(logger_name)
70 sublogger.setLevel(sublevel)
73 def build_server_calculator(config):
74 cloud_size_list = config.node_sizes()
75 if not cloud_size_list:
76 abort("No valid node sizes configured")
77 return ServerCalculator(cloud_size_list,
78 config.getint('Daemon', 'max_nodes'),
79 config.getfloat('Daemon', 'max_total_price'),
80 config.getfloat('Daemon', 'node_mem_scaling'))
82 def launch_pollers(config, server_calculator):
83 poll_time = config.getfloat('Daemon', 'poll_time')
84 max_poll_time = config.getint('Daemon', 'max_poll_time')
86 cloudlist_poll_time = config.getfloat('Daemon', 'cloudlist_poll_time') or poll_time
87 nodelist_poll_time = config.getfloat('Daemon', 'nodelist_poll_time') or poll_time
88 wishlist_poll_time = config.getfloat('Daemon', 'wishlist_poll_time') or poll_time
90 timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
91 cloud_node_poller = CloudNodeListMonitorActor.start(
92 config.new_cloud_client(), timer, server_calculator, cloudlist_poll_time, max_poll_time).tell_proxy()
93 arvados_node_poller = ArvadosNodeListMonitorActor.start(
94 config.new_arvados_client(), timer, nodelist_poll_time, max_poll_time).tell_proxy()
95 job_queue_poller = JobQueueMonitorActor.start(
96 config.new_arvados_client(), timer, server_calculator,
97 config.getboolean('Arvados', 'jobs_queue'),
98 config.getboolean('Arvados', 'slurm_queue'),
99 wishlist_poll_time, max_poll_time
101 return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
104 def shutdown_signal(signal_code, frame):
105 current_count = _caught_signals.get(signal_code, 0)
106 _caught_signals[signal_code] = current_count + 1
107 if node_daemon is None:
108 pykka.ActorRegistry.stop_all()
109 sys.exit(-signal_code)
110 elif current_count == 0:
112 node_daemon.shutdown()
113 elif current_count == 1:
114 pykka.ActorRegistry.stop_all()
116 sys.exit(-signal_code)
119 global node_daemon, watchdog
120 args = parse_cli(args)
121 config = load_config(args.config)
123 if not args.foreground:
124 daemon.DaemonContext().open()
125 for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
126 signal.signal(sigcode, shutdown_signal)
128 status.Server(config).start()
131 root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
132 root_logger.info("%s %s started, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
133 node_setup, node_shutdown, node_update, node_monitor = \
134 config.dispatch_classes()
135 server_calculator = build_server_calculator(config)
136 timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
137 launch_pollers(config, server_calculator)
138 cloud_node_updater = node_update.start(config.new_cloud_client, timer).tell_proxy()
139 node_daemon = NodeManagerDaemonActor.start(
140 job_queue_poller, arvados_node_poller, cloud_node_poller,
141 cloud_node_updater, timer,
142 config.new_arvados_client, config.new_cloud_client,
143 config.shutdown_windows(),
145 config.getint('Daemon', 'min_nodes'),
146 config.getint('Daemon', 'max_nodes'),
147 config.getint('Daemon', 'poll_stale_after'),
148 config.getint('Daemon', 'boot_fail_after'),
149 config.getint('Daemon', 'node_stale_after'),
150 node_setup, node_shutdown, node_monitor,
151 max_total_price=config.getfloat('Daemon', 'max_total_price'),
152 consecutive_idle_count=config.getint('Daemon', 'consecutive_idle_count'),).tell_proxy()
154 watchdog = WatchdogActor.start(config.getint('Daemon', 'watchdog'),
155 cloud_node_poller.actor_ref,
156 arvados_node_poller.actor_ref,
157 job_queue_poller.actor_ref,
158 node_daemon.actor_ref)
161 daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
162 while not daemon_stopped():
165 logging.exception("Uncaught exception during setup")
167 pykka.ActorRegistry.stop_all()
170 if __name__ == '__main__':