11209: Restore missing import.
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import argparse
6 import logging
7 import signal
8 import sys
9 import time
10
11 import daemon
12 import pykka
13 import libcloud
14
15 from . import config as nmconfig
16 from .baseactor import WatchdogActor
17 from .daemon import NodeManagerDaemonActor
18 from .jobqueue import JobQueueMonitorActor, ServerCalculator
19 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
20 from .timedcallback import TimedCallBackActor
21 from ._version import __version__
22
23 node_daemon = None
24
25 def abort(msg, code=1):
26     print("arvados-node-manager: " + msg)
27     sys.exit(code)
28
29 def parse_cli(args):
30     parser = argparse.ArgumentParser(
31         prog='arvados-node-manager',
32         description="Dynamically allocate Arvados cloud compute nodes")
33     parser.add_argument(
34         '--version', action='version',
35         version="%s %s" % (sys.argv[0], __version__),
36         help='Print version and exit.')
37     parser.add_argument(
38         '--foreground', action='store_true', default=False,
39         help="Run in the foreground.  Don't daemonize.")
40     parser.add_argument(
41         '--config', help="Path to configuration file")
42     return parser.parse_args(args)
43
44 def load_config(path):
45     if not path:
46         abort("No --config file specified", 2)
47     config = nmconfig.NodeManagerConfig()
48     try:
49         with open(path) as config_file:
50             config.readfp(config_file)
51     except (IOError, OSError) as error:
52         abort("Error reading configuration file {}: {}".format(path, error))
53     return config
54
55 def setup_logging(path, level, **sublevels):
56     handler = logging.FileHandler(path)
57     handler.setFormatter(logging.Formatter(
58             '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
59             '%Y-%m-%d %H:%M:%S'))
60     root_logger = logging.getLogger()
61     root_logger.addHandler(handler)
62     root_logger.setLevel(level)
63     for logger_name, sublevel in sublevels.iteritems():
64         sublogger = logging.getLogger(logger_name)
65         sublogger.setLevel(sublevel)
66     return root_logger
67
68 def build_server_calculator(config):
69     cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
70     if not cloud_size_list:
71         abort("No valid node sizes configured")
72     return ServerCalculator(cloud_size_list,
73                             config.getint('Daemon', 'max_nodes'),
74                             config.getfloat('Daemon', 'max_total_price'),
75                             config.getfloat('Daemon', 'node_mem_scaling'))
76
77 def launch_pollers(config, server_calculator):
78     poll_time = config.getint('Daemon', 'poll_time')
79     max_poll_time = config.getint('Daemon', 'max_poll_time')
80
81     timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
82     cloud_node_poller = CloudNodeListMonitorActor.start(
83         config.new_cloud_client(), timer, server_calculator, poll_time, max_poll_time).tell_proxy()
84     arvados_node_poller = ArvadosNodeListMonitorActor.start(
85         config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
86     job_queue_poller = JobQueueMonitorActor.start(
87         config.new_arvados_client(), timer, server_calculator,
88         poll_time, max_poll_time).tell_proxy()
89     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
90
91 _caught_signals = {}
92 def shutdown_signal(signal_code, frame):
93     current_count = _caught_signals.get(signal_code, 0)
94     _caught_signals[signal_code] = current_count + 1
95     if node_daemon is None:
96         pykka.ActorRegistry.stop_all()
97         sys.exit(-signal_code)
98     elif current_count == 0:
99         node_daemon.shutdown()
100     elif current_count == 1:
101         pykka.ActorRegistry.stop_all()
102     else:
103         sys.exit(-signal_code)
104
105 def main(args=None):
106     global node_daemon
107     args = parse_cli(args)
108     config = load_config(args.config)
109
110     if not args.foreground:
111         daemon.DaemonContext().open()
112     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
113         signal.signal(sigcode, shutdown_signal)
114
115     try:
116         root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
117         root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
118         node_setup, node_shutdown, node_update, node_monitor = \
119             config.dispatch_classes()
120         server_calculator = build_server_calculator(config)
121         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
122             launch_pollers(config, server_calculator)
123         cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
124         node_daemon = NodeManagerDaemonActor.start(
125             job_queue_poller, arvados_node_poller, cloud_node_poller,
126             cloud_node_updater, timer,
127             config.new_arvados_client, config.new_cloud_client,
128             config.shutdown_windows(),
129             server_calculator,
130             config.getint('Daemon', 'min_nodes'),
131             config.getint('Daemon', 'max_nodes'),
132             config.getint('Daemon', 'poll_stale_after'),
133             config.getint('Daemon', 'boot_fail_after'),
134             config.getint('Daemon', 'node_stale_after'),
135             node_setup, node_shutdown, node_monitor,
136             max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
137
138         WatchdogActor.start(config.getint('Daemon', 'watchdog'),
139                             cloud_node_poller.actor_ref,
140                             arvados_node_poller.actor_ref,
141                             job_queue_poller.actor_ref,
142                             node_daemon.actor_ref)
143
144         signal.pause()
145         daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
146         while not daemon_stopped():
147             time.sleep(1)
148     except Exception:
149         logging.exception("Uncaught exception during setup")
150     finally:
151         pykka.ActorRegistry.stop_all()
152
153
154 if __name__ == '__main__':
155     main()