10312: Integration test framework for node manager, runs full node manager with
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import argparse
6 import logging
7 import signal
8 import sys
9 import time
10
11 import daemon
12 import pykka
13 import libcloud
14
15 from . import config as nmconfig
16 from . import status
17 from .baseactor import WatchdogActor
18 from .daemon import NodeManagerDaemonActor
19 from .jobqueue import JobQueueMonitorActor, ServerCalculator
20 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
21 from .timedcallback import TimedCallBackActor
22 from ._version import __version__
23
24 node_daemon = None
25 watchdog = None
26
27 def abort(msg, code=1):
28     print("arvados-node-manager: " + msg)
29     sys.exit(code)
30
31 def parse_cli(args):
32     parser = argparse.ArgumentParser(
33         prog='arvados-node-manager',
34         description="Dynamically allocate Arvados cloud compute nodes")
35     parser.add_argument(
36         '--version', action='version',
37         version="%s %s" % (sys.argv[0], __version__),
38         help='Print version and exit.')
39     parser.add_argument(
40         '--foreground', action='store_true', default=False,
41         help="Run in the foreground.  Don't daemonize.")
42     parser.add_argument(
43         '--config', help="Path to configuration file")
44     return parser.parse_args(args)
45
46 def load_config(path):
47     if not path:
48         abort("No --config file specified", 2)
49     config = nmconfig.NodeManagerConfig()
50     try:
51         with open(path) as config_file:
52             config.readfp(config_file)
53     except (IOError, OSError) as error:
54         abort("Error reading configuration file {}: {}".format(path, error))
55     return config
56
57 def setup_logging(path, level, **sublevels):
58     handler = logging.FileHandler(path)
59     handler.setFormatter(logging.Formatter(
60             '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
61             '%Y-%m-%d %H:%M:%S'))
62     root_logger = logging.getLogger()
63     root_logger.addHandler(handler)
64     root_logger.setLevel(level)
65     for logger_name, sublevel in sublevels.iteritems():
66         sublogger = logging.getLogger(logger_name)
67         sublogger.setLevel(sublevel)
68     return root_logger
69
70 def build_server_calculator(config):
71     cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
72     if not cloud_size_list:
73         abort("No valid node sizes configured")
74     return ServerCalculator(cloud_size_list,
75                             config.getint('Daemon', 'max_nodes'),
76                             config.getfloat('Daemon', 'max_total_price'),
77                             config.getfloat('Daemon', 'node_mem_scaling'))
78
79 def launch_pollers(config, server_calculator):
80     poll_time = config.getint('Daemon', 'poll_time')
81     max_poll_time = config.getint('Daemon', 'max_poll_time')
82
83     timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
84     cloud_node_poller = CloudNodeListMonitorActor.start(
85         config.new_cloud_client(), timer, server_calculator, poll_time, max_poll_time).tell_proxy()
86     arvados_node_poller = ArvadosNodeListMonitorActor.start(
87         config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
88     job_queue_poller = JobQueueMonitorActor.start(
89         config.new_arvados_client(), timer, server_calculator,
90         poll_time, max_poll_time).tell_proxy()
91     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
92
93 _caught_signals = {}
94 def shutdown_signal(signal_code, frame):
95     current_count = _caught_signals.get(signal_code, 0)
96     _caught_signals[signal_code] = current_count + 1
97     if node_daemon is None:
98         pykka.ActorRegistry.stop_all()
99         sys.exit(-signal_code)
100     elif current_count == 0:
101         watchdog.stop()
102         node_daemon.shutdown()
103     elif current_count == 1:
104         pykka.ActorRegistry.stop_all()
105     else:
106         sys.exit(-signal_code)
107
108 def main(args=None):
109     global node_daemon, watchdog
110     args = parse_cli(args)
111     config = load_config(args.config)
112
113     if not args.foreground:
114         daemon.DaemonContext().open()
115     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
116         signal.signal(sigcode, shutdown_signal)
117
118     status.Server(config).start()
119
120     try:
121         root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
122         root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
123         node_setup, node_shutdown, node_update, node_monitor = \
124             config.dispatch_classes()
125         server_calculator = build_server_calculator(config)
126         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
127             launch_pollers(config, server_calculator)
128         cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
129         node_daemon = NodeManagerDaemonActor.start(
130             job_queue_poller, arvados_node_poller, cloud_node_poller,
131             cloud_node_updater, timer,
132             config.new_arvados_client, config.new_cloud_client,
133             config.shutdown_windows(),
134             server_calculator,
135             config.getint('Daemon', 'min_nodes'),
136             config.getint('Daemon', 'max_nodes'),
137             config.getint('Daemon', 'poll_stale_after'),
138             config.getint('Daemon', 'boot_fail_after'),
139             config.getint('Daemon', 'node_stale_after'),
140             node_setup, node_shutdown, node_monitor,
141             max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
142
143         watchdog = WatchdogActor.start(config.getint('Daemon', 'watchdog'),
144                             cloud_node_poller.actor_ref,
145                             arvados_node_poller.actor_ref,
146                             job_queue_poller.actor_ref,
147                             node_daemon.actor_ref)
148
149         signal.pause()
150         daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
151         while not daemon_stopped():
152             time.sleep(1)
153     except Exception:
154         logging.exception("Uncaught exception during setup")
155     finally:
156         pykka.ActorRegistry.stop_all()
157
158
159 if __name__ == '__main__':
160     main()