Merge branch 'master' into 8876-work-unit
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import argparse
6 import logging
7 import signal
8 import sys
9 import time
10
11 import daemon
12 import pykka
13
14 from . import config as nmconfig
15 from .baseactor import WatchdogActor
16 from .daemon import NodeManagerDaemonActor
17 from .jobqueue import JobQueueMonitorActor, ServerCalculator
18 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
19 from .timedcallback import TimedCallBackActor
20
21 node_daemon = None
22
23 def abort(msg, code=1):
24     print("arvados-node-manager: " + msg)
25     sys.exit(code)
26
27 def parse_cli(args):
28     parser = argparse.ArgumentParser(
29         prog='arvados-node-manager',
30         description="Dynamically allocate Arvados cloud compute nodes")
31     parser.add_argument(
32         '--foreground', action='store_true', default=False,
33         help="Run in the foreground.  Don't daemonize.")
34     parser.add_argument(
35         '--config', help="Path to configuration file")
36     return parser.parse_args(args)
37
38 def load_config(path):
39     if not path:
40         abort("No --config file specified", 2)
41     config = nmconfig.NodeManagerConfig()
42     try:
43         with open(path) as config_file:
44             config.readfp(config_file)
45     except (IOError, OSError) as error:
46         abort("Error reading configuration file {}: {}".format(path, error))
47     return config
48
49 def setup_logging(path, level, **sublevels):
50     handler = logging.FileHandler(path)
51     handler.setFormatter(logging.Formatter(
52             '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
53             '%Y-%m-%d %H:%M:%S'))
54     root_logger = logging.getLogger()
55     root_logger.addHandler(handler)
56     root_logger.setLevel(level)
57     for logger_name, sublevel in sublevels.iteritems():
58         sublogger = logging.getLogger(logger_name)
59         sublogger.setLevel(sublevel)
60
61 def build_server_calculator(config):
62     cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
63     if not cloud_size_list:
64         abort("No valid node sizes configured")
65     return ServerCalculator(cloud_size_list,
66                             config.getint('Daemon', 'max_nodes'),
67                             config.getfloat('Daemon', 'max_total_price'))
68
69 def launch_pollers(config, server_calculator):
70     poll_time = config.getint('Daemon', 'poll_time')
71     max_poll_time = config.getint('Daemon', 'max_poll_time')
72
73     timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
74     cloud_node_poller = CloudNodeListMonitorActor.start(
75         config.new_cloud_client(), timer, poll_time, max_poll_time).tell_proxy()
76     arvados_node_poller = ArvadosNodeListMonitorActor.start(
77         config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
78     job_queue_poller = JobQueueMonitorActor.start(
79         config.new_arvados_client(), timer, server_calculator,
80         poll_time, max_poll_time).tell_proxy()
81     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
82
83 _caught_signals = {}
84 def shutdown_signal(signal_code, frame):
85     current_count = _caught_signals.get(signal_code, 0)
86     _caught_signals[signal_code] = current_count + 1
87     if node_daemon is None:
88         pykka.ActorRegistry.stop_all()
89         sys.exit(-signal_code)
90     elif current_count == 0:
91         node_daemon.shutdown()
92     elif current_count == 1:
93         pykka.ActorRegistry.stop_all()
94     else:
95         sys.exit(-signal_code)
96
97 def main(args=None):
98     global node_daemon
99     args = parse_cli(args)
100     config = load_config(args.config)
101
102     if not args.foreground:
103         daemon.DaemonContext().open()
104     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
105         signal.signal(sigcode, shutdown_signal)
106
107     try:
108         setup_logging(config.get('Logging', 'file'), **config.log_levels())
109         node_setup, node_shutdown, node_update, node_monitor = \
110             config.dispatch_classes()
111         server_calculator = build_server_calculator(config)
112         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
113             launch_pollers(config, server_calculator)
114         cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
115         node_daemon = NodeManagerDaemonActor.start(
116             job_queue_poller, arvados_node_poller, cloud_node_poller,
117             cloud_node_updater, timer,
118             config.new_arvados_client, config.new_cloud_client,
119             config.shutdown_windows(),
120             server_calculator,
121             config.getint('Daemon', 'min_nodes'),
122             config.getint('Daemon', 'max_nodes'),
123             config.getint('Daemon', 'poll_stale_after'),
124             config.getint('Daemon', 'boot_fail_after'),
125             config.getint('Daemon', 'node_stale_after'),
126             node_setup, node_shutdown, node_monitor,
127             max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
128
129         WatchdogActor.start(config.getint('Daemon', 'watchdog'),
130                             cloud_node_poller.actor_ref,
131                             arvados_node_poller.actor_ref,
132                             job_queue_poller.actor_ref,
133                             node_daemon.actor_ref)
134
135         signal.pause()
136         daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
137         while not daemon_stopped():
138             time.sleep(1)
139     except Exception:
140         logging.exception("Uncaught exception during setup")
141     finally:
142         pykka.ActorRegistry.stop_all()
143
144
145 if __name__ == '__main__':
146     main()