3 from __future__ import absolute_import, print_function
14 from . import config as nmconfig
15 from .baseactor import WatchdogActor
16 from .daemon import NodeManagerDaemonActor
17 from .jobqueue import JobQueueMonitorActor, ServerCalculator
18 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
19 from .timedcallback import TimedCallBackActor
23 def abort(msg, code=1):
24 print("arvados-node-manager: " + msg)
28 parser = argparse.ArgumentParser(
29 prog='arvados-node-manager',
30 description="Dynamically allocate Arvados cloud compute nodes")
32 '--foreground', action='store_true', default=False,
33 help="Run in the foreground. Don't daemonize.")
35 '--config', help="Path to configuration file")
36 return parser.parse_args(args)
38 def load_config(path):
40 abort("No --config file specified", 2)
41 config = nmconfig.NodeManagerConfig()
43 with open(path) as config_file:
44 config.readfp(config_file)
45 except (IOError, OSError) as error:
46 abort("Error reading configuration file {}: {}".format(path, error))
49 def setup_logging(path, level, **sublevels):
50 handler = logging.FileHandler(path)
51 handler.setFormatter(logging.Formatter(
52 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
54 root_logger = logging.getLogger()
55 root_logger.addHandler(handler)
56 root_logger.setLevel(level)
57 for logger_name, sublevel in sublevels.iteritems():
58 sublogger = logging.getLogger(logger_name)
59 sublogger.setLevel(sublevel)
61 def build_server_calculator(config):
62 cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
63 if not cloud_size_list:
64 abort("No valid node sizes configured")
65 return ServerCalculator(cloud_size_list,
66 config.getint('Daemon', 'max_nodes'),
67 config.getfloat('Daemon', 'max_total_price'))
69 def launch_pollers(config, server_calculator):
70 poll_time = config.getint('Daemon', 'poll_time')
71 max_poll_time = config.getint('Daemon', 'max_poll_time')
73 timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
74 cloud_node_poller = CloudNodeListMonitorActor.start(
75 config.new_cloud_client(), timer, poll_time, max_poll_time).tell_proxy()
76 arvados_node_poller = ArvadosNodeListMonitorActor.start(
77 config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
78 job_queue_poller = JobQueueMonitorActor.start(
79 config.new_arvados_client(), timer, server_calculator,
80 poll_time, max_poll_time).tell_proxy()
81 return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
84 def shutdown_signal(signal_code, frame):
85 current_count = _caught_signals.get(signal_code, 0)
86 _caught_signals[signal_code] = current_count + 1
87 if node_daemon is None:
88 pykka.ActorRegistry.stop_all()
89 sys.exit(-signal_code)
90 elif current_count == 0:
91 node_daemon.shutdown()
92 elif current_count == 1:
93 pykka.ActorRegistry.stop_all()
95 sys.exit(-signal_code)
99 args = parse_cli(args)
100 config = load_config(args.config)
102 if not args.foreground:
103 daemon.DaemonContext().open()
104 for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
105 signal.signal(sigcode, shutdown_signal)
108 setup_logging(config.get('Logging', 'file'), **config.log_levels())
109 node_setup, node_shutdown, node_update, node_monitor = \
110 config.dispatch_classes()
111 server_calculator = build_server_calculator(config)
112 timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
113 launch_pollers(config, server_calculator)
114 cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
115 node_daemon = NodeManagerDaemonActor.start(
116 job_queue_poller, arvados_node_poller, cloud_node_poller,
117 cloud_node_updater, timer,
118 config.new_arvados_client, config.new_cloud_client,
119 config.shutdown_windows(),
121 config.getint('Daemon', 'min_nodes'),
122 config.getint('Daemon', 'max_nodes'),
123 config.getint('Daemon', 'poll_stale_after'),
124 config.getint('Daemon', 'boot_fail_after'),
125 config.getint('Daemon', 'node_stale_after'),
126 node_setup, node_shutdown, node_monitor,
127 max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
129 WatchdogActor.start(config.getint('Daemon', 'watchdog'),
130 cloud_node_poller.actor_ref,
131 arvados_node_poller.actor_ref,
132 job_queue_poller.actor_ref,
133 node_daemon.actor_ref)
136 daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
137 while not daemon_stopped():
140 logging.exception("Uncaught exception during setup")
142 pykka.ActorRegistry.stop_all()
145 if __name__ == '__main__':