3 from __future__ import absolute_import, print_function
14 from . import config as nmconfig
15 from .daemon import NodeManagerDaemonActor
16 from .jobqueue import JobQueueMonitorActor, ServerCalculator
17 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
18 from .timedcallback import TimedCallBackActor
22 def abort(msg, code=1):
23 print("arvados-node-manager: " + msg)
27 parser = argparse.ArgumentParser(
28 prog='arvados-node-manager',
29 description="Dynamically allocate Arvados cloud compute nodes")
31 '--foreground', action='store_true', default=False,
32 help="Run in the foreground. Don't daemonize.")
34 '--config', help="Path to configuration file")
35 return parser.parse_args(args)
37 def load_config(path):
39 abort("No --config file specified", 2)
40 config = nmconfig.NodeManagerConfig()
42 with open(path) as config_file:
43 config.readfp(config_file)
44 except (IOError, OSError) as error:
45 abort("Error reading configuration file {}: {}".format(path, error))
48 def setup_logging(path, level, **sublevels):
49 handler = logging.FileHandler(path)
50 handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
53 root_logger = logging.getLogger()
54 root_logger.addHandler(handler)
55 root_logger.setLevel(level)
56 for logger_name, sublevel in sublevels.iteritems():
57 sublogger = logging.getLogger(logger_name)
58 sublogger.setLevel(sublevel)
60 def build_server_calculator(config):
61 cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
62 if not cloud_size_list:
63 abort("No valid node sizes configured")
64 return ServerCalculator(cloud_size_list,
65 config.getint('Daemon', 'max_nodes'),
66 config.getfloat('Daemon', 'max_total_price'))
68 def launch_pollers(config, server_calculator):
69 poll_time = config.getint('Daemon', 'poll_time')
70 max_poll_time = config.getint('Daemon', 'max_poll_time')
72 timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
73 cloud_node_poller = CloudNodeListMonitorActor.start(
74 config.new_cloud_client(), timer, poll_time, max_poll_time).tell_proxy()
75 arvados_node_poller = ArvadosNodeListMonitorActor.start(
76 config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
77 job_queue_poller = JobQueueMonitorActor.start(
78 config.new_arvados_client(), timer, server_calculator,
79 poll_time, max_poll_time).tell_proxy()
80 return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
83 def shutdown_signal(signal_code, frame):
84 current_count = _caught_signals.get(signal_code, 0)
85 _caught_signals[signal_code] = current_count + 1
86 if node_daemon is None:
87 pykka.ActorRegistry.stop_all()
88 sys.exit(-signal_code)
89 elif current_count == 0:
90 node_daemon.shutdown()
91 elif current_count == 1:
92 pykka.ActorRegistry.stop_all()
94 sys.exit(-signal_code)
98 args = parse_cli(args)
99 config = load_config(args.config)
101 if not args.foreground:
102 daemon.DaemonContext().open()
103 for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
104 signal.signal(sigcode, shutdown_signal)
107 setup_logging(config.get('Logging', 'file'), **config.log_levels())
108 node_setup, node_shutdown, node_update, node_monitor = \
109 config.dispatch_classes()
110 server_calculator = build_server_calculator(config)
111 timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
112 launch_pollers(config, server_calculator)
113 cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
114 node_daemon = NodeManagerDaemonActor.start(
115 job_queue_poller, arvados_node_poller, cloud_node_poller,
116 cloud_node_updater, timer,
117 config.new_arvados_client, config.new_cloud_client,
118 config.shutdown_windows(),
120 config.getint('Daemon', 'min_nodes'),
121 config.getint('Daemon', 'max_nodes'),
122 config.getint('Daemon', 'poll_stale_after'),
123 config.getint('Daemon', 'boot_fail_after'),
124 config.getint('Daemon', 'node_stale_after'),
125 node_setup, node_shutdown, node_monitor,
126 max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
129 daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
130 while not daemon_stopped():
133 logging.exception("Uncaught exception during setup")
135 pykka.ActorRegistry.stop_all()
138 if __name__ == '__main__':