11121: Merge branch 'master' into 11121-crunch-output-collection-owner
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import argparse
6 import logging
7 import signal
8 import sys
9 import time
10
11 import daemon
12 import pykka
13 import libcloud
14
15 from . import config as nmconfig
16 from .baseactor import WatchdogActor
17 from .daemon import NodeManagerDaemonActor
18 from .jobqueue import JobQueueMonitorActor, ServerCalculator
19 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
20 from .timedcallback import TimedCallBackActor
21 from ._version import __version__
22
23 node_daemon = None
24
25 def abort(msg, code=1):
26     print("arvados-node-manager: " + msg)
27     sys.exit(code)
28
29 def parse_cli(args):
30     parser = argparse.ArgumentParser(
31         prog='arvados-node-manager',
32         description="Dynamically allocate Arvados cloud compute nodes")
33     parser.add_argument(
34         '--version', action='version',
35         version="%s %s" % (sys.argv[0], __version__),
36         help='Print version and exit.')
37     parser.add_argument(
38         '--foreground', action='store_true', default=False,
39         help="Run in the foreground.  Don't daemonize.")
40     parser.add_argument(
41         '--config', help="Path to configuration file")
42     return parser.parse_args(args)
43
44 def load_config(path):
45     if not path:
46         abort("No --config file specified", 2)
47     config = nmconfig.NodeManagerConfig()
48     try:
49         with open(path) as config_file:
50             config.readfp(config_file)
51     except (IOError, OSError) as error:
52         abort("Error reading configuration file {}: {}".format(path, error))
53     return config
54
55 def setup_logging(path, level, **sublevels):
56     handler = logging.FileHandler(path)
57     handler.setFormatter(logging.Formatter(
58             '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
59             '%Y-%m-%d %H:%M:%S'))
60     root_logger = logging.getLogger()
61     root_logger.addHandler(handler)
62     root_logger.setLevel(level)
63     for logger_name, sublevel in sublevels.iteritems():
64         sublogger = logging.getLogger(logger_name)
65         sublogger.setLevel(sublevel)
66     return root_logger
67
68 def build_server_calculator(config):
69     cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
70     if not cloud_size_list:
71         abort("No valid node sizes configured")
72     return ServerCalculator(cloud_size_list,
73                             config.getint('Daemon', 'max_nodes'),
74                             config.getfloat('Daemon', 'max_total_price'))
75
76 def launch_pollers(config, server_calculator):
77     poll_time = config.getint('Daemon', 'poll_time')
78     max_poll_time = config.getint('Daemon', 'max_poll_time')
79
80     timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
81     cloud_node_poller = CloudNodeListMonitorActor.start(
82         config.new_cloud_client(), timer, server_calculator, poll_time, max_poll_time).tell_proxy()
83     arvados_node_poller = ArvadosNodeListMonitorActor.start(
84         config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
85     job_queue_poller = JobQueueMonitorActor.start(
86         config.new_arvados_client(), timer, server_calculator,
87         poll_time, max_poll_time).tell_proxy()
88     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
89
90 _caught_signals = {}
91 def shutdown_signal(signal_code, frame):
92     current_count = _caught_signals.get(signal_code, 0)
93     _caught_signals[signal_code] = current_count + 1
94     if node_daemon is None:
95         pykka.ActorRegistry.stop_all()
96         sys.exit(-signal_code)
97     elif current_count == 0:
98         node_daemon.shutdown()
99     elif current_count == 1:
100         pykka.ActorRegistry.stop_all()
101     else:
102         sys.exit(-signal_code)
103
104 def main(args=None):
105     global node_daemon
106     args = parse_cli(args)
107     config = load_config(args.config)
108
109     if not args.foreground:
110         daemon.DaemonContext().open()
111     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
112         signal.signal(sigcode, shutdown_signal)
113
114     try:
115         root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
116         root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
117         node_setup, node_shutdown, node_update, node_monitor = \
118             config.dispatch_classes()
119         server_calculator = build_server_calculator(config)
120         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
121             launch_pollers(config, server_calculator)
122         cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
123         node_daemon = NodeManagerDaemonActor.start(
124             job_queue_poller, arvados_node_poller, cloud_node_poller,
125             cloud_node_updater, timer,
126             config.new_arvados_client, config.new_cloud_client,
127             config.shutdown_windows(),
128             server_calculator,
129             config.getint('Daemon', 'min_nodes'),
130             config.getint('Daemon', 'max_nodes'),
131             config.getint('Daemon', 'poll_stale_after'),
132             config.getint('Daemon', 'boot_fail_after'),
133             config.getint('Daemon', 'node_stale_after'),
134             node_setup, node_shutdown, node_monitor,
135             max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
136
137         WatchdogActor.start(config.getint('Daemon', 'watchdog'),
138                             cloud_node_poller.actor_ref,
139                             arvados_node_poller.actor_ref,
140                             job_queue_poller.actor_ref,
141                             node_daemon.actor_ref)
142
143         signal.pause()
144         daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
145         while not daemon_stopped():
146             time.sleep(1)
147     except Exception:
148         logging.exception("Uncaught exception during setup")
149     finally:
150         pykka.ActorRegistry.stop_all()
151
152
153 if __name__ == '__main__':
154     main()