Merge branch '4380-node-manager-computenode-reorg-wip'
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import argparse
6 import logging
7 import signal
8 import sys
9 import time
10
11 import daemon
12 import pykka
13
14 from . import config as nmconfig
15 from .computenode.dispatch import ComputeNodeUpdateActor
16 from .daemon import NodeManagerDaemonActor
17 from .jobqueue import JobQueueMonitorActor, ServerCalculator
18 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
19 from .timedcallback import TimedCallBackActor
20
21 node_daemon = None
22
23 def abort(msg, code=1):
24     print("arvados-node-manager: " + msg)
25     sys.exit(code)
26
27 def parse_cli(args):
28     parser = argparse.ArgumentParser(
29         prog='arvados-node-manager',
30         description="Dynamically allocate Arvados cloud compute nodes")
31     parser.add_argument(
32         '--foreground', action='store_true', default=False,
33         help="Run in the foreground.  Don't daemonize.")
34     parser.add_argument(
35         '--config', help="Path to configuration file")
36     return parser.parse_args(args)
37
38 def load_config(path):
39     if not path:
40         abort("No --config file specified", 2)
41     config = nmconfig.NodeManagerConfig()
42     try:
43         with open(path) as config_file:
44             config.readfp(config_file)
45     except (IOError, OSError) as error:
46         abort("Error reading configuration file {}: {}".format(path, error))
47     return config
48
49 def setup_logging(path, level, **sublevels):
50     handler = logging.FileHandler(path)
51     handler.setFormatter(logging.Formatter(
52             '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
53             '%Y-%m-%d %H:%M:%S'))
54     root_logger = logging.getLogger()
55     root_logger.addHandler(handler)
56     root_logger.setLevel(level)
57     for logger_name, sublevel in sublevels.iteritems():
58         sublogger = logging.getLogger(logger_name)
59         sublogger.setLevel(sublevel)
60
61 def launch_pollers(config):
62     cloud_client = config.new_cloud_client()
63     arvados_client = config.new_arvados_client()
64     cloud_size_list = config.node_sizes(cloud_client.list_sizes())
65     if not cloud_size_list:
66         abort("No valid node sizes configured")
67
68     server_calculator = ServerCalculator(
69         cloud_size_list,
70         config.getint('Daemon', 'min_nodes'),
71         config.getint('Daemon', 'max_nodes'))
72     poll_time = config.getint('Daemon', 'poll_time')
73     max_poll_time = config.getint('Daemon', 'max_poll_time')
74
75     timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
76     cloud_node_poller = CloudNodeListMonitorActor.start(
77         cloud_client, timer, poll_time, max_poll_time).proxy()
78     arvados_node_poller = ArvadosNodeListMonitorActor.start(
79         arvados_client, timer, poll_time, max_poll_time).proxy()
80     job_queue_poller = JobQueueMonitorActor.start(
81         config.new_arvados_client(), timer, server_calculator,
82         poll_time, max_poll_time).proxy()
83     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
84
85 _caught_signals = {}
86 def shutdown_signal(signal_code, frame):
87     current_count = _caught_signals.get(signal_code, 0)
88     _caught_signals[signal_code] = current_count + 1
89     if node_daemon is None:
90         pykka.ActorRegistry.stop_all()
91         sys.exit(-signal_code)
92     elif current_count == 0:
93         node_daemon.shutdown()
94     elif current_count == 1:
95         pykka.ActorRegistry.stop_all()
96     else:
97         sys.exit(-signal_code)
98
99 def main(args=None):
100     global node_daemon
101     args = parse_cli(args)
102     config = load_config(args.config)
103
104     if not args.foreground:
105         daemon.DaemonContext().open()
106     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
107         signal.signal(sigcode, shutdown_signal)
108
109     setup_logging(config.get('Logging', 'file'), **config.log_levels())
110     timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
111         launch_pollers(config)
112     cloud_node_updater = ComputeNodeUpdateActor.start(
113         config.new_cloud_client).proxy()
114     node_daemon = NodeManagerDaemonActor.start(
115         job_queue_poller, arvados_node_poller, cloud_node_poller,
116         cloud_node_updater, timer,
117         config.new_arvados_client, config.new_cloud_client,
118         config.shutdown_windows(),
119         config.getint('Daemon', 'min_nodes'),
120         config.getint('Daemon', 'max_nodes'),
121         config.getint('Daemon', 'poll_stale_after'),
122         config.getint('Daemon', 'node_stale_after')).proxy()
123
124     signal.pause()
125     daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
126     while not daemon_stopped():
127         time.sleep(1)
128     pykka.ActorRegistry.stop_all()
129
130
131 if __name__ == '__main__':
132     main()