3 from __future__ import absolute_import, print_function
14 from . import config as nmconfig
15 from .baseactor import WatchdogActor
16 from .daemon import NodeManagerDaemonActor
17 from .jobqueue import JobQueueMonitorActor, ServerCalculator
18 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
19 from .timedcallback import TimedCallBackActor
20 from ._version import __version__
24 def abort(msg, code=1):
25 print("arvados-node-manager: " + msg)
29 parser = argparse.ArgumentParser(
30 prog='arvados-node-manager',
31 description="Dynamically allocate Arvados cloud compute nodes")
33 '--version', action='version',
34 version="%s %s" % (sys.argv[0], __version__),
35 help='Print version and exit.')
37 '--foreground', action='store_true', default=False,
38 help="Run in the foreground. Don't daemonize.")
40 '--config', help="Path to configuration file")
41 return parser.parse_args(args)
43 def load_config(path):
45 abort("No --config file specified", 2)
46 config = nmconfig.NodeManagerConfig()
48 with open(path) as config_file:
49 config.readfp(config_file)
50 except (IOError, OSError) as error:
51 abort("Error reading configuration file {}: {}".format(path, error))
54 def setup_logging(path, level, **sublevels):
55 handler = logging.FileHandler(path)
56 handler.setFormatter(logging.Formatter(
57 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
59 root_logger = logging.getLogger()
60 root_logger.addHandler(handler)
61 root_logger.setLevel(level)
62 for logger_name, sublevel in sublevels.iteritems():
63 sublogger = logging.getLogger(logger_name)
64 sublogger.setLevel(sublevel)
66 def build_server_calculator(config):
67 cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
68 if not cloud_size_list:
69 abort("No valid node sizes configured")
70 return ServerCalculator(cloud_size_list,
71 config.getint('Daemon', 'max_nodes'),
72 config.getfloat('Daemon', 'max_total_price'))
74 def launch_pollers(config, server_calculator):
75 poll_time = config.getint('Daemon', 'poll_time')
76 max_poll_time = config.getint('Daemon', 'max_poll_time')
78 timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
79 cloud_node_poller = CloudNodeListMonitorActor.start(
80 config.new_cloud_client(), timer, poll_time, max_poll_time).tell_proxy()
81 arvados_node_poller = ArvadosNodeListMonitorActor.start(
82 config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
83 job_queue_poller = JobQueueMonitorActor.start(
84 config.new_arvados_client(), timer, server_calculator,
85 poll_time, max_poll_time).tell_proxy()
86 return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
89 def shutdown_signal(signal_code, frame):
90 current_count = _caught_signals.get(signal_code, 0)
91 _caught_signals[signal_code] = current_count + 1
92 if node_daemon is None:
93 pykka.ActorRegistry.stop_all()
94 sys.exit(-signal_code)
95 elif current_count == 0:
96 node_daemon.shutdown()
97 elif current_count == 1:
98 pykka.ActorRegistry.stop_all()
100 sys.exit(-signal_code)
104 args = parse_cli(args)
105 config = load_config(args.config)
107 if not args.foreground:
108 daemon.DaemonContext().open()
109 for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
110 signal.signal(sigcode, shutdown_signal)
113 setup_logging(config.get('Logging', 'file'), **config.log_levels())
114 node_setup, node_shutdown, node_update, node_monitor = \
115 config.dispatch_classes()
116 server_calculator = build_server_calculator(config)
117 timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
118 launch_pollers(config, server_calculator)
119 cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
120 node_daemon = NodeManagerDaemonActor.start(
121 job_queue_poller, arvados_node_poller, cloud_node_poller,
122 cloud_node_updater, timer,
123 config.new_arvados_client, config.new_cloud_client,
124 config.shutdown_windows(),
126 config.getint('Daemon', 'min_nodes'),
127 config.getint('Daemon', 'max_nodes'),
128 config.getint('Daemon', 'poll_stale_after'),
129 config.getint('Daemon', 'boot_fail_after'),
130 config.getint('Daemon', 'node_stale_after'),
131 node_setup, node_shutdown, node_monitor,
132 max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
134 WatchdogActor.start(config.getint('Daemon', 'watchdog'),
135 cloud_node_poller.actor_ref,
136 arvados_node_poller.actor_ref,
137 job_queue_poller.actor_ref,
138 node_daemon.actor_ref)
141 daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
142 while not daemon_stopped():
145 logging.exception("Uncaught exception during setup")
147 pykka.ActorRegistry.stop_all()
150 if __name__ == '__main__':