2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
18 from . import config as nmconfig
20 from .baseactor import WatchdogActor
21 from .daemon import NodeManagerDaemonActor
22 from .jobqueue import JobQueueMonitorActor, ServerCalculator
23 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
24 from .timedcallback import TimedCallBackActor
25 from ._version import __version__
30 def abort(msg, code=1):
31 print("arvados-node-manager: " + msg)
35 parser = argparse.ArgumentParser(
36 prog='arvados-node-manager',
37 description="Dynamically allocate Arvados cloud compute nodes")
39 '--version', action='version',
40 version="%s %s" % (sys.argv[0], __version__),
41 help='Print version and exit.')
43 '--foreground', action='store_true', default=False,
44 help="Run in the foreground. Don't daemonize.")
46 '--config', help="Path to configuration file")
47 return parser.parse_args(args)
49 def load_config(path):
51 abort("No --config file specified", 2)
52 config = nmconfig.NodeManagerConfig()
54 with open(path) as config_file:
55 config.readfp(config_file)
56 except (IOError, OSError) as error:
57 abort("Error reading configuration file {}: {}".format(path, error))
60 def setup_logging(path, level, **sublevels):
61 handler = logging.FileHandler(path)
62 handler.setFormatter(logging.Formatter(
63 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
65 root_logger = logging.getLogger()
66 root_logger.addHandler(handler)
67 root_logger.setLevel(level)
68 for logger_name, sublevel in sublevels.iteritems():
69 sublogger = logging.getLogger(logger_name)
70 sublogger.setLevel(sublevel)
73 def build_server_calculator(config):
74 cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
75 if not cloud_size_list:
76 abort("No valid node sizes configured")
77 return ServerCalculator(cloud_size_list,
78 config.getint('Daemon', 'max_nodes'),
79 config.getfloat('Daemon', 'max_total_price'),
80 config.getfloat('Daemon', 'node_mem_scaling'))
82 def launch_pollers(config, server_calculator):
83 poll_time = config.getint('Daemon', 'poll_time')
84 max_poll_time = config.getint('Daemon', 'max_poll_time')
86 timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
87 cloud_node_poller = CloudNodeListMonitorActor.start(
88 config.new_cloud_client(), timer, server_calculator, poll_time, max_poll_time).tell_proxy()
89 arvados_node_poller = ArvadosNodeListMonitorActor.start(
90 config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
91 job_queue_poller = JobQueueMonitorActor.start(
92 config.new_arvados_client(), timer, server_calculator,
93 config.getboolean('Arvados', 'jobs_queue'),
94 config.getboolean('Arvados', 'slurm_queue'),
95 poll_time, max_poll_time
97 return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
100 def shutdown_signal(signal_code, frame):
101 current_count = _caught_signals.get(signal_code, 0)
102 _caught_signals[signal_code] = current_count + 1
103 if node_daemon is None:
104 pykka.ActorRegistry.stop_all()
105 sys.exit(-signal_code)
106 elif current_count == 0:
108 node_daemon.shutdown()
109 elif current_count == 1:
110 pykka.ActorRegistry.stop_all()
112 sys.exit(-signal_code)
115 global node_daemon, watchdog
116 args = parse_cli(args)
117 config = load_config(args.config)
119 if not args.foreground:
120 daemon.DaemonContext().open()
121 for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
122 signal.signal(sigcode, shutdown_signal)
124 status.Server(config).start()
127 root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
128 root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
129 node_setup, node_shutdown, node_update, node_monitor = \
130 config.dispatch_classes()
131 server_calculator = build_server_calculator(config)
132 timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
133 launch_pollers(config, server_calculator)
134 cloud_node_updater = node_update.start(config.new_cloud_client, timer).tell_proxy()
135 node_daemon = NodeManagerDaemonActor.start(
136 job_queue_poller, arvados_node_poller, cloud_node_poller,
137 cloud_node_updater, timer,
138 config.new_arvados_client, config.new_cloud_client,
139 config.shutdown_windows(),
141 config.getint('Daemon', 'min_nodes'),
142 config.getint('Daemon', 'max_nodes'),
143 config.getint('Daemon', 'poll_stale_after'),
144 config.getint('Daemon', 'boot_fail_after'),
145 config.getint('Daemon', 'node_stale_after'),
146 node_setup, node_shutdown, node_monitor,
147 max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
149 watchdog = WatchdogActor.start(config.getint('Daemon', 'watchdog'),
150 cloud_node_poller.actor_ref,
151 arvados_node_poller.actor_ref,
152 job_queue_poller.actor_ref,
153 node_daemon.actor_ref)
156 daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
157 while not daemon_stopped():
160 logging.exception("Uncaught exception during setup")
162 pykka.ActorRegistry.stop_all()
165 if __name__ == '__main__':