3 from __future__ import absolute_import, print_function
14 from . import config as nmconfig
15 from .computenode.dispatch import ComputeNodeUpdateActor
16 from .daemon import NodeManagerDaemonActor
17 from .jobqueue import JobQueueMonitorActor, ServerCalculator
18 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
19 from .timedcallback import TimedCallBackActor
23 def abort(msg, code=1):
24 print("arvados-node-manager: " + msg)
28 parser = argparse.ArgumentParser(
29 prog='arvados-node-manager',
30 description="Dynamically allocate Arvados cloud compute nodes")
32 '--foreground', action='store_true', default=False,
33 help="Run in the foreground. Don't daemonize.")
35 '--config', help="Path to configuration file")
36 return parser.parse_args(args)
38 def load_config(path):
40 abort("No --config file specified", 2)
41 config = nmconfig.NodeManagerConfig()
43 with open(path) as config_file:
44 config.readfp(config_file)
45 except (IOError, OSError) as error:
46 abort("Error reading configuration file {}: {}".format(path, error))
49 def setup_logging(path, level, **sublevels):
50 handler = logging.FileHandler(path)
51 handler.setFormatter(logging.Formatter(
52 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
54 root_logger = logging.getLogger()
55 root_logger.addHandler(handler)
56 root_logger.setLevel(level)
57 for logger_name, sublevel in sublevels.iteritems():
58 sublogger = logging.getLogger(logger_name)
59 sublogger.setLevel(sublevel)
61 def launch_pollers(config):
62 cloud_client = config.new_cloud_client()
63 arvados_client = config.new_arvados_client()
64 cloud_size_list = config.node_sizes(cloud_client.list_sizes())
65 if not cloud_size_list:
66 abort("No valid node sizes configured")
68 server_calculator = ServerCalculator(
70 config.getint('Daemon', 'min_nodes'),
71 config.getint('Daemon', 'max_nodes'))
72 poll_time = config.getint('Daemon', 'poll_time')
73 max_poll_time = config.getint('Daemon', 'max_poll_time')
75 timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
76 cloud_node_poller = CloudNodeListMonitorActor.start(
77 cloud_client, timer, poll_time, max_poll_time).proxy()
78 arvados_node_poller = ArvadosNodeListMonitorActor.start(
79 arvados_client, timer, poll_time, max_poll_time).proxy()
80 job_queue_poller = JobQueueMonitorActor.start(
81 config.new_arvados_client(), timer, server_calculator,
82 poll_time, max_poll_time).proxy()
83 return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
86 def shutdown_signal(signal_code, frame):
87 current_count = _caught_signals.get(signal_code, 0)
88 _caught_signals[signal_code] = current_count + 1
89 if node_daemon is None:
90 pykka.ActorRegistry.stop_all()
91 sys.exit(-signal_code)
92 elif current_count == 0:
93 node_daemon.shutdown()
94 elif current_count == 1:
95 pykka.ActorRegistry.stop_all()
97 sys.exit(-signal_code)
101 args = parse_cli(args)
102 config = load_config(args.config)
104 if not args.foreground:
105 daemon.DaemonContext().open()
106 for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
107 signal.signal(sigcode, shutdown_signal)
109 setup_logging(config.get('Logging', 'file'), **config.log_levels())
110 timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
111 launch_pollers(config)
112 cloud_node_updater = ComputeNodeUpdateActor.start(
113 config.new_cloud_client).proxy()
114 node_daemon = NodeManagerDaemonActor.start(
115 job_queue_poller, arvados_node_poller, cloud_node_poller,
116 cloud_node_updater, timer,
117 config.new_arvados_client, config.new_cloud_client,
118 config.shutdown_windows(),
119 config.getint('Daemon', 'min_nodes'),
120 config.getint('Daemon', 'max_nodes'),
121 config.getint('Daemon', 'poll_stale_after'),
122 config.getint('Daemon', 'node_stale_after')).proxy()
125 daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
126 while not daemon_stopped():
128 pykka.ActorRegistry.stop_all()
131 if __name__ == '__main__':