Merge branch 'master' into 4024-pipeline-instances-scroll
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import argparse
6 import logging
7 import signal
8 import sys
9 import time
10
11 import daemon
12 import pykka
13
14 from . import config as nmconfig
15 from .computenode import \
16     ComputeNodeSetupActor, ComputeNodeShutdownActor, ComputeNodeUpdateActor, \
17     ShutdownTimer
18 from .daemon import NodeManagerDaemonActor
19 from .jobqueue import JobQueueMonitorActor, ServerCalculator
20 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
21 from .timedcallback import TimedCallBackActor
22
23 node_daemon = None
24
25 def abort(msg, code=1):
26     print("arvados-node-manager: " + msg)
27     sys.exit(code)
28
29 def parse_cli(args):
30     parser = argparse.ArgumentParser(
31         prog='arvados-node-manager',
32         description="Dynamically allocate Arvados cloud compute nodes")
33     parser.add_argument(
34         '--foreground', action='store_true', default=False,
35         help="Run in the foreground.  Don't daemonize.")
36     parser.add_argument(
37         '--config', help="Path to configuration file")
38     return parser.parse_args(args)
39
40 def load_config(path):
41     if not path:
42         abort("No --config file specified", 2)
43     config = nmconfig.NodeManagerConfig()
44     try:
45         with open(path) as config_file:
46             config.readfp(config_file)
47     except (IOError, OSError) as error:
48         abort("Error reading configuration file {}: {}".format(path, error))
49     return config
50
51 def setup_logging(path, level, **sublevels):
52     handler = logging.FileHandler(path)
53     handler.setFormatter(logging.Formatter(
54             '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
55             '%Y-%m-%d %H:%M:%S'))
56     root_logger = logging.getLogger()
57     root_logger.addHandler(handler)
58     root_logger.setLevel(level)
59     for logger_name, sublevel in sublevels.iteritems():
60         sublogger = logging.getLogger(logger_name)
61         sublogger.setLevel(sublevel)
62
63 def launch_pollers(config):
64     cloud_client = config.new_cloud_client()
65     arvados_client = config.new_arvados_client()
66     cloud_size_list = config.node_sizes(cloud_client.list_sizes())
67     if not cloud_size_list:
68         abort("No valid node sizes configured")
69
70     server_calculator = ServerCalculator(
71         cloud_size_list,
72         config.getint('Daemon', 'min_nodes'),
73         config.getint('Daemon', 'max_nodes'))
74     poll_time = config.getint('Daemon', 'poll_time')
75     max_poll_time = config.getint('Daemon', 'max_poll_time')
76
77     timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
78     cloud_node_poller = CloudNodeListMonitorActor.start(
79         cloud_client, timer, poll_time, max_poll_time).proxy()
80     arvados_node_poller = ArvadosNodeListMonitorActor.start(
81         arvados_client, timer, poll_time, max_poll_time).proxy()
82     job_queue_poller = JobQueueMonitorActor.start(
83         config.new_arvados_client(), timer, server_calculator,
84         poll_time, max_poll_time).proxy()
85     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
86
87 _caught_signals = {}
88 def shutdown_signal(signal_code, frame):
89     current_count = _caught_signals.get(signal_code, 0)
90     _caught_signals[signal_code] = current_count + 1
91     if node_daemon is None:
92         pykka.ActorRegistry.stop_all()
93         sys.exit(-signal_code)
94     elif current_count == 0:
95         node_daemon.shutdown()
96     elif current_count == 1:
97         pykka.ActorRegistry.stop_all()
98     else:
99         sys.exit(-signal_code)
100
101 def main(args=None):
102     global node_daemon
103     args = parse_cli(args)
104     config = load_config(args.config)
105
106     if not args.foreground:
107         daemon.DaemonContext().open()
108     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
109         signal.signal(sigcode, shutdown_signal)
110
111     setup_logging(config.get('Logging', 'file'), **config.log_levels())
112     timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
113         launch_pollers(config)
114     cloud_node_updater = ComputeNodeUpdateActor.start(
115         config.new_cloud_client).proxy()
116     node_daemon = NodeManagerDaemonActor.start(
117         job_queue_poller, arvados_node_poller, cloud_node_poller,
118         cloud_node_updater, timer,
119         config.new_arvados_client, config.new_cloud_client,
120         config.shutdown_windows(),
121         config.getint('Daemon', 'min_nodes'),
122         config.getint('Daemon', 'max_nodes'),
123         config.getint('Daemon', 'poll_stale_after'),
124         config.getint('Daemon', 'node_stale_after')).proxy()
125
126     signal.pause()
127     daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
128     while not daemon_stopped():
129         time.sleep(1)
130     pykka.ActorRegistry.stop_all()
131
132
133 if __name__ == '__main__':
134     main()