Merge branch '11305-migrate-docker19-doc'
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import argparse
6 import logging
7 import signal
8 import sys
9 import time
10
11 import daemon
12 import pykka
13 import libcloud
14
15 from . import config as nmconfig
16 from . import status
17 from .baseactor import WatchdogActor
18 from .daemon import NodeManagerDaemonActor
19 from .jobqueue import JobQueueMonitorActor, ServerCalculator
20 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
21 from .timedcallback import TimedCallBackActor
22 from ._version import __version__
23
24 node_daemon = None
25
26 def abort(msg, code=1):
27     print("arvados-node-manager: " + msg)
28     sys.exit(code)
29
30 def parse_cli(args):
31     parser = argparse.ArgumentParser(
32         prog='arvados-node-manager',
33         description="Dynamically allocate Arvados cloud compute nodes")
34     parser.add_argument(
35         '--version', action='version',
36         version="%s %s" % (sys.argv[0], __version__),
37         help='Print version and exit.')
38     parser.add_argument(
39         '--foreground', action='store_true', default=False,
40         help="Run in the foreground.  Don't daemonize.")
41     parser.add_argument(
42         '--config', help="Path to configuration file")
43     return parser.parse_args(args)
44
45 def load_config(path):
46     if not path:
47         abort("No --config file specified", 2)
48     config = nmconfig.NodeManagerConfig()
49     try:
50         with open(path) as config_file:
51             config.readfp(config_file)
52     except (IOError, OSError) as error:
53         abort("Error reading configuration file {}: {}".format(path, error))
54     return config
55
56 def setup_logging(path, level, **sublevels):
57     handler = logging.FileHandler(path)
58     handler.setFormatter(logging.Formatter(
59             '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
60             '%Y-%m-%d %H:%M:%S'))
61     root_logger = logging.getLogger()
62     root_logger.addHandler(handler)
63     root_logger.setLevel(level)
64     for logger_name, sublevel in sublevels.iteritems():
65         sublogger = logging.getLogger(logger_name)
66         sublogger.setLevel(sublevel)
67     return root_logger
68
69 def build_server_calculator(config):
70     cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
71     if not cloud_size_list:
72         abort("No valid node sizes configured")
73     return ServerCalculator(cloud_size_list,
74                             config.getint('Daemon', 'max_nodes'),
75                             config.getfloat('Daemon', 'max_total_price'),
76                             config.getfloat('Daemon', 'node_mem_scaling'))
77
78 def launch_pollers(config, server_calculator):
79     poll_time = config.getint('Daemon', 'poll_time')
80     max_poll_time = config.getint('Daemon', 'max_poll_time')
81
82     timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
83     cloud_node_poller = CloudNodeListMonitorActor.start(
84         config.new_cloud_client(), timer, server_calculator, poll_time, max_poll_time).tell_proxy()
85     arvados_node_poller = ArvadosNodeListMonitorActor.start(
86         config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
87     job_queue_poller = JobQueueMonitorActor.start(
88         config.new_arvados_client(), timer, server_calculator,
89         poll_time, max_poll_time).tell_proxy()
90     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
91
92 _caught_signals = {}
93 def shutdown_signal(signal_code, frame):
94     current_count = _caught_signals.get(signal_code, 0)
95     _caught_signals[signal_code] = current_count + 1
96     if node_daemon is None:
97         pykka.ActorRegistry.stop_all()
98         sys.exit(-signal_code)
99     elif current_count == 0:
100         node_daemon.shutdown()
101     elif current_count == 1:
102         pykka.ActorRegistry.stop_all()
103     else:
104         sys.exit(-signal_code)
105
106 def main(args=None):
107     global node_daemon
108     args = parse_cli(args)
109     config = load_config(args.config)
110
111     if not args.foreground:
112         daemon.DaemonContext().open()
113     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
114         signal.signal(sigcode, shutdown_signal)
115
116     status.Server(config).start()
117
118     try:
119         root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
120         root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
121         node_setup, node_shutdown, node_update, node_monitor = \
122             config.dispatch_classes()
123         server_calculator = build_server_calculator(config)
124         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
125             launch_pollers(config, server_calculator)
126         cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
127         node_daemon = NodeManagerDaemonActor.start(
128             job_queue_poller, arvados_node_poller, cloud_node_poller,
129             cloud_node_updater, timer,
130             config.new_arvados_client, config.new_cloud_client,
131             config.shutdown_windows(),
132             server_calculator,
133             config.getint('Daemon', 'min_nodes'),
134             config.getint('Daemon', 'max_nodes'),
135             config.getint('Daemon', 'poll_stale_after'),
136             config.getint('Daemon', 'boot_fail_after'),
137             config.getint('Daemon', 'node_stale_after'),
138             node_setup, node_shutdown, node_monitor,
139             max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
140
141         WatchdogActor.start(config.getint('Daemon', 'watchdog'),
142                             cloud_node_poller.actor_ref,
143                             arvados_node_poller.actor_ref,
144                             job_queue_poller.actor_ref,
145                             node_daemon.actor_ref)
146
147         signal.pause()
148         daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
149         while not daemon_stopped():
150             time.sleep(1)
151     except Exception:
152         logging.exception("Uncaught exception during setup")
153     finally:
154         pykka.ActorRegistry.stop_all()
155
156
157 if __name__ == '__main__':
158     main()