10383: Merge branch 'master' into 10383-arv-put-incremental-upload
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import argparse
6 import logging
7 import signal
8 import sys
9 import time
10
11 import daemon
12 import pykka
13
14 from . import config as nmconfig
15 from .baseactor import WatchdogActor
16 from .daemon import NodeManagerDaemonActor
17 from .jobqueue import JobQueueMonitorActor, ServerCalculator
18 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
19 from .timedcallback import TimedCallBackActor
20 from ._version import __version__
21
22 node_daemon = None
23
24 def abort(msg, code=1):
25     print("arvados-node-manager: " + msg)
26     sys.exit(code)
27
28 def parse_cli(args):
29     parser = argparse.ArgumentParser(
30         prog='arvados-node-manager',
31         description="Dynamically allocate Arvados cloud compute nodes")
32     parser.add_argument(
33         '--version', action='version',
34         version="%s %s" % (sys.argv[0], __version__),
35         help='Print version and exit.')
36     parser.add_argument(
37         '--foreground', action='store_true', default=False,
38         help="Run in the foreground.  Don't daemonize.")
39     parser.add_argument(
40         '--config', help="Path to configuration file")
41     return parser.parse_args(args)
42
43 def load_config(path):
44     if not path:
45         abort("No --config file specified", 2)
46     config = nmconfig.NodeManagerConfig()
47     try:
48         with open(path) as config_file:
49             config.readfp(config_file)
50     except (IOError, OSError) as error:
51         abort("Error reading configuration file {}: {}".format(path, error))
52     return config
53
54 def setup_logging(path, level, **sublevels):
55     handler = logging.FileHandler(path)
56     handler.setFormatter(logging.Formatter(
57             '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
58             '%Y-%m-%d %H:%M:%S'))
59     root_logger = logging.getLogger()
60     root_logger.addHandler(handler)
61     root_logger.setLevel(level)
62     for logger_name, sublevel in sublevels.iteritems():
63         sublogger = logging.getLogger(logger_name)
64         sublogger.setLevel(sublevel)
65
66 def build_server_calculator(config):
67     cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
68     if not cloud_size_list:
69         abort("No valid node sizes configured")
70     return ServerCalculator(cloud_size_list,
71                             config.getint('Daemon', 'max_nodes'),
72                             config.getfloat('Daemon', 'max_total_price'))
73
74 def launch_pollers(config, server_calculator):
75     poll_time = config.getint('Daemon', 'poll_time')
76     max_poll_time = config.getint('Daemon', 'max_poll_time')
77
78     timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
79     cloud_node_poller = CloudNodeListMonitorActor.start(
80         config.new_cloud_client(), timer, poll_time, max_poll_time).tell_proxy()
81     arvados_node_poller = ArvadosNodeListMonitorActor.start(
82         config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
83     job_queue_poller = JobQueueMonitorActor.start(
84         config.new_arvados_client(), timer, server_calculator,
85         poll_time, max_poll_time).tell_proxy()
86     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
87
88 _caught_signals = {}
89 def shutdown_signal(signal_code, frame):
90     current_count = _caught_signals.get(signal_code, 0)
91     _caught_signals[signal_code] = current_count + 1
92     if node_daemon is None:
93         pykka.ActorRegistry.stop_all()
94         sys.exit(-signal_code)
95     elif current_count == 0:
96         node_daemon.shutdown()
97     elif current_count == 1:
98         pykka.ActorRegistry.stop_all()
99     else:
100         sys.exit(-signal_code)
101
102 def main(args=None):
103     global node_daemon
104     args = parse_cli(args)
105     config = load_config(args.config)
106
107     if not args.foreground:
108         daemon.DaemonContext().open()
109     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
110         signal.signal(sigcode, shutdown_signal)
111
112     try:
113         setup_logging(config.get('Logging', 'file'), **config.log_levels())
114         node_setup, node_shutdown, node_update, node_monitor = \
115             config.dispatch_classes()
116         server_calculator = build_server_calculator(config)
117         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
118             launch_pollers(config, server_calculator)
119         cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
120         node_daemon = NodeManagerDaemonActor.start(
121             job_queue_poller, arvados_node_poller, cloud_node_poller,
122             cloud_node_updater, timer,
123             config.new_arvados_client, config.new_cloud_client,
124             config.shutdown_windows(),
125             server_calculator,
126             config.getint('Daemon', 'min_nodes'),
127             config.getint('Daemon', 'max_nodes'),
128             config.getint('Daemon', 'poll_stale_after'),
129             config.getint('Daemon', 'boot_fail_after'),
130             config.getint('Daemon', 'node_stale_after'),
131             node_setup, node_shutdown, node_monitor,
132             max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
133
134         WatchdogActor.start(config.getint('Daemon', 'watchdog'),
135                             cloud_node_poller.actor_ref,
136                             arvados_node_poller.actor_ref,
137                             job_queue_poller.actor_ref,
138                             node_daemon.actor_ref)
139
140         signal.pause()
141         daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
142         while not daemon_stopped():
143             time.sleep(1)
144     except Exception:
145         logging.exception("Uncaught exception during setup")
146     finally:
147         pykka.ActorRegistry.stop_all()
148
149
150 if __name__ == '__main__':
151     main()