3 from __future__ import absolute_import, print_function
15 from . import config as nmconfig
16 from .baseactor import WatchdogActor
17 from .daemon import NodeManagerDaemonActor
18 from .jobqueue import JobQueueMonitorActor, ServerCalculator
19 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
20 from .timedcallback import TimedCallBackActor
21 from ._version import __version__
25 def abort(msg, code=1):
26 print("arvados-node-manager: " + msg)
30 parser = argparse.ArgumentParser(
31 prog='arvados-node-manager',
32 description="Dynamically allocate Arvados cloud compute nodes")
34 '--version', action='version',
35 version="%s %s" % (sys.argv[0], __version__),
36 help='Print version and exit.')
38 '--foreground', action='store_true', default=False,
39 help="Run in the foreground. Don't daemonize.")
41 '--config', help="Path to configuration file")
42 return parser.parse_args(args)
44 def load_config(path):
46 abort("No --config file specified", 2)
47 config = nmconfig.NodeManagerConfig()
49 with open(path) as config_file:
50 config.readfp(config_file)
51 except (IOError, OSError) as error:
52 abort("Error reading configuration file {}: {}".format(path, error))
55 def setup_logging(path, level, **sublevels):
56 handler = logging.FileHandler(path)
57 handler.setFormatter(logging.Formatter(
58 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
60 root_logger = logging.getLogger()
61 root_logger.addHandler(handler)
62 root_logger.setLevel(level)
63 for logger_name, sublevel in sublevels.iteritems():
64 sublogger = logging.getLogger(logger_name)
65 sublogger.setLevel(sublevel)
68 def build_server_calculator(config):
69 cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
70 if not cloud_size_list:
71 abort("No valid node sizes configured")
72 return ServerCalculator(cloud_size_list,
73 config.getint('Daemon', 'max_nodes'),
74 config.getfloat('Daemon', 'max_total_price'),
75 config.getfloat('Daemon', 'node_mem_scaling'))
77 def launch_pollers(config, server_calculator):
78 poll_time = config.getint('Daemon', 'poll_time')
79 max_poll_time = config.getint('Daemon', 'max_poll_time')
81 timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
82 cloud_node_poller = CloudNodeListMonitorActor.start(
83 config.new_cloud_client(), timer, server_calculator, poll_time, max_poll_time).tell_proxy()
84 arvados_node_poller = ArvadosNodeListMonitorActor.start(
85 config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
86 job_queue_poller = JobQueueMonitorActor.start(
87 config.new_arvados_client(), timer, server_calculator,
88 poll_time, max_poll_time).tell_proxy()
89 return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
92 def shutdown_signal(signal_code, frame):
93 current_count = _caught_signals.get(signal_code, 0)
94 _caught_signals[signal_code] = current_count + 1
95 if node_daemon is None:
96 pykka.ActorRegistry.stop_all()
97 sys.exit(-signal_code)
98 elif current_count == 0:
99 node_daemon.shutdown()
100 elif current_count == 1:
101 pykka.ActorRegistry.stop_all()
103 sys.exit(-signal_code)
107 args = parse_cli(args)
108 config = load_config(args.config)
110 if not args.foreground:
111 daemon.DaemonContext().open()
112 for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
113 signal.signal(sigcode, shutdown_signal)
116 root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
117 root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
118 node_setup, node_shutdown, node_update, node_monitor = \
119 config.dispatch_classes()
120 server_calculator = build_server_calculator(config)
121 timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
122 launch_pollers(config, server_calculator)
123 cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
124 node_daemon = NodeManagerDaemonActor.start(
125 job_queue_poller, arvados_node_poller, cloud_node_poller,
126 cloud_node_updater, timer,
127 config.new_arvados_client, config.new_cloud_client,
128 config.shutdown_windows(),
130 config.getint('Daemon', 'min_nodes'),
131 config.getint('Daemon', 'max_nodes'),
132 config.getint('Daemon', 'poll_stale_after'),
133 config.getint('Daemon', 'boot_fail_after'),
134 config.getint('Daemon', 'node_stale_after'),
135 node_setup, node_shutdown, node_monitor,
136 max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
138 WatchdogActor.start(config.getint('Daemon', 'watchdog'),
139 cloud_node_poller.actor_ref,
140 arvados_node_poller.actor_ref,
141 job_queue_poller.actor_ref,
142 node_daemon.actor_ref)
145 daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
146 while not daemon_stopped():
149 logging.exception("Uncaught exception during setup")
151 pykka.ActorRegistry.stop_all()
154 if __name__ == '__main__':