3 from __future__ import absolute_import, print_function
15 from . import config as nmconfig
17 from .baseactor import WatchdogActor
18 from .daemon import NodeManagerDaemonActor
19 from .jobqueue import JobQueueMonitorActor, ServerCalculator
20 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
21 from .timedcallback import TimedCallBackActor
22 from ._version import __version__
27 def abort(msg, code=1):
28 print("arvados-node-manager: " + msg)
32 parser = argparse.ArgumentParser(
33 prog='arvados-node-manager',
34 description="Dynamically allocate Arvados cloud compute nodes")
36 '--version', action='version',
37 version="%s %s" % (sys.argv[0], __version__),
38 help='Print version and exit.')
40 '--foreground', action='store_true', default=False,
41 help="Run in the foreground. Don't daemonize.")
43 '--config', help="Path to configuration file")
44 return parser.parse_args(args)
46 def load_config(path):
48 abort("No --config file specified", 2)
49 config = nmconfig.NodeManagerConfig()
51 with open(path) as config_file:
52 config.readfp(config_file)
53 except (IOError, OSError) as error:
54 abort("Error reading configuration file {}: {}".format(path, error))
57 def setup_logging(path, level, **sublevels):
58 handler = logging.FileHandler(path)
59 handler.setFormatter(logging.Formatter(
60 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
62 root_logger = logging.getLogger()
63 root_logger.addHandler(handler)
64 root_logger.setLevel(level)
65 for logger_name, sublevel in sublevels.iteritems():
66 sublogger = logging.getLogger(logger_name)
67 sublogger.setLevel(sublevel)
70 def build_server_calculator(config):
71 cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
72 if not cloud_size_list:
73 abort("No valid node sizes configured")
74 return ServerCalculator(cloud_size_list,
75 config.getint('Daemon', 'max_nodes'),
76 config.getfloat('Daemon', 'max_total_price'),
77 config.getfloat('Daemon', 'node_mem_scaling'))
79 def launch_pollers(config, server_calculator):
80 poll_time = config.getint('Daemon', 'poll_time')
81 max_poll_time = config.getint('Daemon', 'max_poll_time')
83 timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
84 cloud_node_poller = CloudNodeListMonitorActor.start(
85 config.new_cloud_client(), timer, server_calculator, poll_time, max_poll_time).tell_proxy()
86 arvados_node_poller = ArvadosNodeListMonitorActor.start(
87 config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
88 job_queue_poller = JobQueueMonitorActor.start(
89 config.new_arvados_client(), timer, server_calculator,
90 config.getboolean('Arvados', 'jobs_queue'),
91 config.getboolean('Arvados', 'slurm_queue'),
92 poll_time, max_poll_time
94 return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
97 def shutdown_signal(signal_code, frame):
98 current_count = _caught_signals.get(signal_code, 0)
99 _caught_signals[signal_code] = current_count + 1
100 if node_daemon is None:
101 pykka.ActorRegistry.stop_all()
102 sys.exit(-signal_code)
103 elif current_count == 0:
105 node_daemon.shutdown()
106 elif current_count == 1:
107 pykka.ActorRegistry.stop_all()
109 sys.exit(-signal_code)
112 global node_daemon, watchdog
113 args = parse_cli(args)
114 config = load_config(args.config)
116 if not args.foreground:
117 daemon.DaemonContext().open()
118 for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
119 signal.signal(sigcode, shutdown_signal)
121 status.Server(config).start()
124 root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
125 root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
126 node_setup, node_shutdown, node_update, node_monitor = \
127 config.dispatch_classes()
128 server_calculator = build_server_calculator(config)
129 timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
130 launch_pollers(config, server_calculator)
131 cloud_node_updater = node_update.start(config.new_cloud_client, timer).tell_proxy()
132 node_daemon = NodeManagerDaemonActor.start(
133 job_queue_poller, arvados_node_poller, cloud_node_poller,
134 cloud_node_updater, timer,
135 config.new_arvados_client, config.new_cloud_client,
136 config.shutdown_windows(),
138 config.getint('Daemon', 'min_nodes'),
139 config.getint('Daemon', 'max_nodes'),
140 config.getint('Daemon', 'poll_stale_after'),
141 config.getint('Daemon', 'boot_fail_after'),
142 config.getint('Daemon', 'node_stale_after'),
143 node_setup, node_shutdown, node_monitor,
144 max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
146 watchdog = WatchdogActor.start(config.getint('Daemon', 'watchdog'),
147 cloud_node_poller.actor_ref,
148 arvados_node_poller.actor_ref,
149 job_queue_poller.actor_ref,
150 node_daemon.actor_ref)
153 daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
154 while not daemon_stopped():
157 logging.exception("Uncaught exception during setup")
159 pykka.ActorRegistry.stop_all()
162 if __name__ == '__main__':