3 from __future__ import absolute_import, print_function
15 from . import config as nmconfig
17 from .baseactor import WatchdogActor
18 from .daemon import NodeManagerDaemonActor
19 from .jobqueue import JobQueueMonitorActor, ServerCalculator
20 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
21 from .timedcallback import TimedCallBackActor
22 from ._version import __version__
26 def abort(msg, code=1):
27 print("arvados-node-manager: " + msg)
31 parser = argparse.ArgumentParser(
32 prog='arvados-node-manager',
33 description="Dynamically allocate Arvados cloud compute nodes")
35 '--version', action='version',
36 version="%s %s" % (sys.argv[0], __version__),
37 help='Print version and exit.')
39 '--foreground', action='store_true', default=False,
40 help="Run in the foreground. Don't daemonize.")
42 '--config', help="Path to configuration file")
43 return parser.parse_args(args)
45 def load_config(path):
47 abort("No --config file specified", 2)
48 config = nmconfig.NodeManagerConfig()
50 with open(path) as config_file:
51 config.readfp(config_file)
52 except (IOError, OSError) as error:
53 abort("Error reading configuration file {}: {}".format(path, error))
56 def setup_logging(path, level, **sublevels):
57 handler = logging.FileHandler(path)
58 handler.setFormatter(logging.Formatter(
59 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
61 root_logger = logging.getLogger()
62 root_logger.addHandler(handler)
63 root_logger.setLevel(level)
64 for logger_name, sublevel in sublevels.iteritems():
65 sublogger = logging.getLogger(logger_name)
66 sublogger.setLevel(sublevel)
69 def build_server_calculator(config):
70 cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
71 if not cloud_size_list:
72 abort("No valid node sizes configured")
73 return ServerCalculator(cloud_size_list,
74 config.getint('Daemon', 'max_nodes'),
75 config.getfloat('Daemon', 'max_total_price'),
76 config.getfloat('Daemon', 'node_mem_scaling'))
78 def launch_pollers(config, server_calculator):
79 poll_time = config.getint('Daemon', 'poll_time')
80 max_poll_time = config.getint('Daemon', 'max_poll_time')
82 timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
83 cloud_node_poller = CloudNodeListMonitorActor.start(
84 config.new_cloud_client(), timer, server_calculator, poll_time, max_poll_time).tell_proxy()
85 arvados_node_poller = ArvadosNodeListMonitorActor.start(
86 config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
87 job_queue_poller = JobQueueMonitorActor.start(
88 config.new_arvados_client(), timer, server_calculator,
89 poll_time, max_poll_time).tell_proxy()
90 return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
93 def shutdown_signal(signal_code, frame):
94 current_count = _caught_signals.get(signal_code, 0)
95 _caught_signals[signal_code] = current_count + 1
96 if node_daemon is None:
97 pykka.ActorRegistry.stop_all()
98 sys.exit(-signal_code)
99 elif current_count == 0:
100 node_daemon.shutdown()
101 elif current_count == 1:
102 pykka.ActorRegistry.stop_all()
104 sys.exit(-signal_code)
108 args = parse_cli(args)
109 config = load_config(args.config)
111 if not args.foreground:
112 daemon.DaemonContext().open()
113 for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
114 signal.signal(sigcode, shutdown_signal)
116 status.Server(config).start()
119 root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
120 root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
121 node_setup, node_shutdown, node_update, node_monitor = \
122 config.dispatch_classes()
123 server_calculator = build_server_calculator(config)
124 timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
125 launch_pollers(config, server_calculator)
126 cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
127 node_daemon = NodeManagerDaemonActor.start(
128 job_queue_poller, arvados_node_poller, cloud_node_poller,
129 cloud_node_updater, timer,
130 config.new_arvados_client, config.new_cloud_client,
131 config.shutdown_windows(),
133 config.getint('Daemon', 'min_nodes'),
134 config.getint('Daemon', 'max_nodes'),
135 config.getint('Daemon', 'poll_stale_after'),
136 config.getint('Daemon', 'boot_fail_after'),
137 config.getint('Daemon', 'node_stale_after'),
138 node_setup, node_shutdown, node_monitor,
139 max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
141 WatchdogActor.start(config.getint('Daemon', 'watchdog'),
142 cloud_node_poller.actor_ref,
143 arvados_node_poller.actor_ref,
144 job_queue_poller.actor_ref,
145 node_daemon.actor_ref)
148 daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
149 while not daemon_stopped():
152 logging.exception("Uncaught exception during setup")
154 pykka.ActorRegistry.stop_all()
157 if __name__ == '__main__':