Merge branch '8087-arv-cli-request-body-from-file' of https://github.com/wtsi-hgi...
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import argparse
6 import logging
7 import signal
8 import sys
9 import time
10
11 import daemon
12 import pykka
13
14 from . import config as nmconfig
15 from .daemon import NodeManagerDaemonActor
16 from .jobqueue import JobQueueMonitorActor, ServerCalculator
17 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
18 from .timedcallback import TimedCallBackActor
19
20 node_daemon = None
21
22 def abort(msg, code=1):
23     print("arvados-node-manager: " + msg)
24     sys.exit(code)
25
26 def parse_cli(args):
27     parser = argparse.ArgumentParser(
28         prog='arvados-node-manager',
29         description="Dynamically allocate Arvados cloud compute nodes")
30     parser.add_argument(
31         '--foreground', action='store_true', default=False,
32         help="Run in the foreground.  Don't daemonize.")
33     parser.add_argument(
34         '--config', help="Path to configuration file")
35     return parser.parse_args(args)
36
37 def load_config(path):
38     if not path:
39         abort("No --config file specified", 2)
40     config = nmconfig.NodeManagerConfig()
41     try:
42         with open(path) as config_file:
43             config.readfp(config_file)
44     except (IOError, OSError) as error:
45         abort("Error reading configuration file {}: {}".format(path, error))
46     return config
47
48 def setup_logging(path, level, **sublevels):
49     handler = logging.FileHandler(path)
50     handler.setFormatter(logging.Formatter(
51             '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
52             '%Y-%m-%d %H:%M:%S'))
53     root_logger = logging.getLogger()
54     root_logger.addHandler(handler)
55     root_logger.setLevel(level)
56     for logger_name, sublevel in sublevels.iteritems():
57         sublogger = logging.getLogger(logger_name)
58         sublogger.setLevel(sublevel)
59
60 def build_server_calculator(config):
61     cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
62     if not cloud_size_list:
63         abort("No valid node sizes configured")
64     return ServerCalculator(cloud_size_list,
65                             config.getint('Daemon', 'max_nodes'),
66                             config.getfloat('Daemon', 'max_total_price'))
67
68 def launch_pollers(config, server_calculator):
69     poll_time = config.getint('Daemon', 'poll_time')
70     max_poll_time = config.getint('Daemon', 'max_poll_time')
71
72     timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
73     cloud_node_poller = CloudNodeListMonitorActor.start(
74         config.new_cloud_client(), timer, poll_time, max_poll_time).tell_proxy()
75     arvados_node_poller = ArvadosNodeListMonitorActor.start(
76         config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
77     job_queue_poller = JobQueueMonitorActor.start(
78         config.new_arvados_client(), timer, server_calculator,
79         poll_time, max_poll_time).tell_proxy()
80     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
81
82 _caught_signals = {}
83 def shutdown_signal(signal_code, frame):
84     current_count = _caught_signals.get(signal_code, 0)
85     _caught_signals[signal_code] = current_count + 1
86     if node_daemon is None:
87         pykka.ActorRegistry.stop_all()
88         sys.exit(-signal_code)
89     elif current_count == 0:
90         node_daemon.shutdown()
91     elif current_count == 1:
92         pykka.ActorRegistry.stop_all()
93     else:
94         sys.exit(-signal_code)
95
96 def main(args=None):
97     global node_daemon
98     args = parse_cli(args)
99     config = load_config(args.config)
100
101     if not args.foreground:
102         daemon.DaemonContext().open()
103     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
104         signal.signal(sigcode, shutdown_signal)
105
106     try:
107         setup_logging(config.get('Logging', 'file'), **config.log_levels())
108         node_setup, node_shutdown, node_update, node_monitor = \
109             config.dispatch_classes()
110         server_calculator = build_server_calculator(config)
111         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
112             launch_pollers(config, server_calculator)
113         cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
114         node_daemon = NodeManagerDaemonActor.start(
115             job_queue_poller, arvados_node_poller, cloud_node_poller,
116             cloud_node_updater, timer,
117             config.new_arvados_client, config.new_cloud_client,
118             config.shutdown_windows(),
119             server_calculator,
120             config.getint('Daemon', 'min_nodes'),
121             config.getint('Daemon', 'max_nodes'),
122             config.getint('Daemon', 'poll_stale_after'),
123             config.getint('Daemon', 'boot_fail_after'),
124             config.getint('Daemon', 'node_stale_after'),
125             node_setup, node_shutdown, node_monitor,
126             max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
127
128         signal.pause()
129         daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
130         while not daemon_stopped():
131             time.sleep(1)
132     except Exception:
133         logging.exception("Uncaught exception during setup")
134     finally:
135         pykka.ActorRegistry.stop_all()
136
137
138 if __name__ == '__main__':
139     main()