3 from __future__ import absolute_import, print_function
15 from . import config as nmconfig
17 from .baseactor import WatchdogActor
18 from .daemon import NodeManagerDaemonActor
19 from .jobqueue import JobQueueMonitorActor, ServerCalculator
20 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
21 from .timedcallback import TimedCallBackActor
22 from ._version import __version__
27 def abort(msg, code=1):
28 print("arvados-node-manager: " + msg)
32 parser = argparse.ArgumentParser(
33 prog='arvados-node-manager',
34 description="Dynamically allocate Arvados cloud compute nodes")
36 '--version', action='version',
37 version="%s %s" % (sys.argv[0], __version__),
38 help='Print version and exit.')
40 '--foreground', action='store_true', default=False,
41 help="Run in the foreground. Don't daemonize.")
43 '--config', help="Path to configuration file")
44 return parser.parse_args(args)
46 def load_config(path):
48 abort("No --config file specified", 2)
49 config = nmconfig.NodeManagerConfig()
51 with open(path) as config_file:
52 config.readfp(config_file)
53 except (IOError, OSError) as error:
54 abort("Error reading configuration file {}: {}".format(path, error))
57 def setup_logging(path, level, **sublevels):
58 handler = logging.FileHandler(path)
59 handler.setFormatter(logging.Formatter(
60 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
62 root_logger = logging.getLogger()
63 root_logger.addHandler(handler)
64 root_logger.setLevel(level)
65 for logger_name, sublevel in sublevels.iteritems():
66 sublogger = logging.getLogger(logger_name)
67 sublogger.setLevel(sublevel)
70 def build_server_calculator(config):
71 cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
72 if not cloud_size_list:
73 abort("No valid node sizes configured")
74 return ServerCalculator(cloud_size_list,
75 config.getint('Daemon', 'max_nodes'),
76 config.getfloat('Daemon', 'max_total_price'),
77 config.getfloat('Daemon', 'node_mem_scaling'))
79 def launch_pollers(config, server_calculator):
80 poll_time = config.getint('Daemon', 'poll_time')
81 max_poll_time = config.getint('Daemon', 'max_poll_time')
83 timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
84 cloud_node_poller = CloudNodeListMonitorActor.start(
85 config.new_cloud_client(), timer, server_calculator, poll_time, max_poll_time).tell_proxy()
86 arvados_node_poller = ArvadosNodeListMonitorActor.start(
87 config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
88 job_queue_poller = JobQueueMonitorActor.start(
89 config.new_arvados_client(), timer, server_calculator,
90 poll_time, max_poll_time).tell_proxy()
91 return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
94 def shutdown_signal(signal_code, frame):
95 current_count = _caught_signals.get(signal_code, 0)
96 _caught_signals[signal_code] = current_count + 1
97 if node_daemon is None:
98 pykka.ActorRegistry.stop_all()
99 sys.exit(-signal_code)
100 elif current_count == 0:
102 node_daemon.shutdown()
103 elif current_count == 1:
104 pykka.ActorRegistry.stop_all()
106 sys.exit(-signal_code)
109 global node_daemon, watchdog
110 args = parse_cli(args)
111 config = load_config(args.config)
113 if not args.foreground:
114 daemon.DaemonContext().open()
115 for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
116 signal.signal(sigcode, shutdown_signal)
118 status.Server(config).start()
121 root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
122 root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
123 node_setup, node_shutdown, node_update, node_monitor = \
124 config.dispatch_classes()
125 server_calculator = build_server_calculator(config)
126 timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
127 launch_pollers(config, server_calculator)
128 cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
129 node_daemon = NodeManagerDaemonActor.start(
130 job_queue_poller, arvados_node_poller, cloud_node_poller,
131 cloud_node_updater, timer,
132 config.new_arvados_client, config.new_cloud_client,
133 config.shutdown_windows(),
135 config.getint('Daemon', 'min_nodes'),
136 config.getint('Daemon', 'max_nodes'),
137 config.getint('Daemon', 'poll_stale_after'),
138 config.getint('Daemon', 'boot_fail_after'),
139 config.getint('Daemon', 'node_stale_after'),
140 node_setup, node_shutdown, node_monitor,
141 max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
143 watchdog = WatchdogActor.start(config.getint('Daemon', 'watchdog'),
144 cloud_node_poller.actor_ref,
145 arvados_node_poller.actor_ref,
146 job_queue_poller.actor_ref,
147 node_daemon.actor_ref)
150 daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
151 while not daemon_stopped():
154 logging.exception("Uncaught exception during setup")
156 pykka.ActorRegistry.stop_all()
159 if __name__ == '__main__':