3 from __future__ import absolute_import, print_function
15 from . import config as nmconfig
16 from .baseactor import WatchdogActor
17 from .daemon import NodeManagerDaemonActor
18 from .jobqueue import JobQueueMonitorActor, ServerCalculator
19 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
20 from .timedcallback import TimedCallBackActor
21 from ._version import __version__
25 def abort(msg, code=1):
26 print("arvados-node-manager: " + msg)
30 parser = argparse.ArgumentParser(
31 prog='arvados-node-manager',
32 description="Dynamically allocate Arvados cloud compute nodes")
34 '--version', action='version',
35 version="%s %s" % (sys.argv[0], __version__),
36 help='Print version and exit.')
38 '--foreground', action='store_true', default=False,
39 help="Run in the foreground. Don't daemonize.")
41 '--config', help="Path to configuration file")
42 return parser.parse_args(args)
44 def load_config(path):
46 abort("No --config file specified", 2)
47 config = nmconfig.NodeManagerConfig()
49 with open(path) as config_file:
50 config.readfp(config_file)
51 except (IOError, OSError) as error:
52 abort("Error reading configuration file {}: {}".format(path, error))
55 def setup_logging(path, level, **sublevels):
56 handler = logging.FileHandler(path)
57 handler.setFormatter(logging.Formatter(
58 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
60 root_logger = logging.getLogger()
61 root_logger.addHandler(handler)
62 root_logger.setLevel(level)
63 for logger_name, sublevel in sublevels.iteritems():
64 sublogger = logging.getLogger(logger_name)
65 sublogger.setLevel(sublevel)
68 def build_server_calculator(config):
69 cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
70 if not cloud_size_list:
71 abort("No valid node sizes configured")
72 return ServerCalculator(cloud_size_list,
73 config.getint('Daemon', 'max_nodes'),
74 config.getfloat('Daemon', 'max_total_price'))
76 def launch_pollers(config, server_calculator):
77 poll_time = config.getint('Daemon', 'poll_time')
78 max_poll_time = config.getint('Daemon', 'max_poll_time')
80 timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
81 cloud_node_poller = CloudNodeListMonitorActor.start(
82 config.new_cloud_client(), timer, server_calculator, poll_time, max_poll_time).tell_proxy()
83 arvados_node_poller = ArvadosNodeListMonitorActor.start(
84 config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
85 job_queue_poller = JobQueueMonitorActor.start(
86 config.new_arvados_client(), timer, server_calculator,
87 poll_time, max_poll_time).tell_proxy()
88 return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
91 def shutdown_signal(signal_code, frame):
92 current_count = _caught_signals.get(signal_code, 0)
93 _caught_signals[signal_code] = current_count + 1
94 if node_daemon is None:
95 pykka.ActorRegistry.stop_all()
96 sys.exit(-signal_code)
97 elif current_count == 0:
98 node_daemon.shutdown()
99 elif current_count == 1:
100 pykka.ActorRegistry.stop_all()
102 sys.exit(-signal_code)
106 args = parse_cli(args)
107 config = load_config(args.config)
109 if not args.foreground:
110 daemon.DaemonContext().open()
111 for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
112 signal.signal(sigcode, shutdown_signal)
115 root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
116 root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
117 node_setup, node_shutdown, node_update, node_monitor = \
118 config.dispatch_classes()
119 server_calculator = build_server_calculator(config)
120 timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
121 launch_pollers(config, server_calculator)
122 cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
123 node_daemon = NodeManagerDaemonActor.start(
124 job_queue_poller, arvados_node_poller, cloud_node_poller,
125 cloud_node_updater, timer,
126 config.new_arvados_client, config.new_cloud_client,
127 config.shutdown_windows(),
129 config.getint('Daemon', 'min_nodes'),
130 config.getint('Daemon', 'max_nodes'),
131 config.getint('Daemon', 'poll_stale_after'),
132 config.getint('Daemon', 'boot_fail_after'),
133 config.getint('Daemon', 'node_stale_after'),
134 node_setup, node_shutdown, node_monitor,
135 max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
137 WatchdogActor.start(config.getint('Daemon', 'watchdog'),
138 cloud_node_poller.actor_ref,
139 arvados_node_poller.actor_ref,
140 job_queue_poller.actor_ref,
141 node_daemon.actor_ref)
144 daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
145 while not daemon_stopped():
148 logging.exception("Uncaught exception during setup")
150 pykka.ActorRegistry.stop_all()
153 if __name__ == '__main__':