3 from __future__ import absolute_import, print_function
14 from . import config as nmconfig
15 from .computenode import \
16 ComputeNodeSetupActor, ComputeNodeShutdownActor, ComputeNodeUpdateActor, \
18 from .daemon import NodeManagerDaemonActor
19 from .jobqueue import JobQueueMonitorActor, ServerCalculator
20 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
21 from .timedcallback import TimedCallBackActor
25 def abort(msg, code=1):
26 print("arvados-node-manager: " + msg)
30 parser = argparse.ArgumentParser(
31 prog='arvados-node-manager',
32 description="Dynamically allocate Arvados cloud compute nodes")
34 '--foreground', action='store_true', default=False,
35 help="Run in the foreground. Don't daemonize.")
37 '--config', help="Path to configuration file")
38 return parser.parse_args(args)
40 def load_config(path):
42 abort("No --config file specified", 2)
43 config = nmconfig.NodeManagerConfig()
45 with open(path) as config_file:
46 config.readfp(config_file)
47 except (IOError, OSError) as error:
48 abort("Error reading configuration file {}: {}".format(path, error))
51 def setup_logging(path, level, **sublevels):
52 handler = logging.FileHandler(path)
53 handler.setFormatter(logging.Formatter(
54 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
56 root_logger = logging.getLogger()
57 root_logger.addHandler(handler)
58 root_logger.setLevel(level)
59 for logger_name, sublevel in sublevels.iteritems():
60 sublogger = logging.getLogger(logger_name)
61 sublogger.setLevel(sublevel)
63 def launch_pollers(config):
64 cloud_client = config.new_cloud_client()
65 arvados_client = config.new_arvados_client()
66 cloud_size_list = config.node_sizes(cloud_client.list_sizes())
67 if not cloud_size_list:
68 abort("No valid node sizes configured")
70 server_calculator = ServerCalculator(
72 config.getint('Daemon', 'min_nodes'),
73 config.getint('Daemon', 'max_nodes'))
74 poll_time = config.getint('Daemon', 'poll_time')
75 max_poll_time = config.getint('Daemon', 'max_poll_time')
77 timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
78 cloud_node_poller = CloudNodeListMonitorActor.start(
79 cloud_client, timer, poll_time, max_poll_time).proxy()
80 arvados_node_poller = ArvadosNodeListMonitorActor.start(
81 arvados_client, timer, poll_time, max_poll_time).proxy()
82 job_queue_poller = JobQueueMonitorActor.start(
83 config.new_arvados_client(), timer, server_calculator,
84 poll_time, max_poll_time).proxy()
85 return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
88 def shutdown_signal(signal_code, frame):
89 current_count = _caught_signals.get(signal_code, 0)
90 _caught_signals[signal_code] = current_count + 1
91 if node_daemon is None:
92 pykka.ActorRegistry.stop_all()
93 sys.exit(-signal_code)
94 elif current_count == 0:
95 node_daemon.shutdown()
96 elif current_count == 1:
97 pykka.ActorRegistry.stop_all()
99 sys.exit(-signal_code)
103 args = parse_cli(args)
104 config = load_config(args.config)
106 if not args.foreground:
107 daemon.DaemonContext().open()
108 for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
109 signal.signal(sigcode, shutdown_signal)
111 setup_logging(config.get('Logging', 'file'), **config.log_levels())
112 timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
113 launch_pollers(config)
114 cloud_node_updater = ComputeNodeUpdateActor.start(
115 config.new_cloud_client).proxy()
116 node_daemon = NodeManagerDaemonActor.start(
117 job_queue_poller, arvados_node_poller, cloud_node_poller,
118 cloud_node_updater, timer,
119 config.new_arvados_client, config.new_cloud_client,
120 config.shutdown_windows(),
121 config.getint('Daemon', 'min_nodes'),
122 config.getint('Daemon', 'max_nodes'),
123 config.getint('Daemon', 'poll_stale_after'),
124 config.getint('Daemon', 'node_stale_after')).proxy()
127 daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
128 while not daemon_stopped():
130 pykka.ActorRegistry.stop_all()
133 if __name__ == '__main__':