8784: Fix test for latest firefox.
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import argparse
6 import logging
7 import signal
8 import sys
9 import time
10
11 import daemon
12 import pykka
13 import libcloud
14
15 from . import config as nmconfig
16 from . import status
17 from .baseactor import WatchdogActor
18 from .daemon import NodeManagerDaemonActor
19 from .jobqueue import JobQueueMonitorActor, ServerCalculator
20 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
21 from .timedcallback import TimedCallBackActor
22 from ._version import __version__
23
24 node_daemon = None
25 watchdog = None
26
27 def abort(msg, code=1):
28     print("arvados-node-manager: " + msg)
29     sys.exit(code)
30
31 def parse_cli(args):
32     parser = argparse.ArgumentParser(
33         prog='arvados-node-manager',
34         description="Dynamically allocate Arvados cloud compute nodes")
35     parser.add_argument(
36         '--version', action='version',
37         version="%s %s" % (sys.argv[0], __version__),
38         help='Print version and exit.')
39     parser.add_argument(
40         '--foreground', action='store_true', default=False,
41         help="Run in the foreground.  Don't daemonize.")
42     parser.add_argument(
43         '--config', help="Path to configuration file")
44     return parser.parse_args(args)
45
46 def load_config(path):
47     if not path:
48         abort("No --config file specified", 2)
49     config = nmconfig.NodeManagerConfig()
50     try:
51         with open(path) as config_file:
52             config.readfp(config_file)
53     except (IOError, OSError) as error:
54         abort("Error reading configuration file {}: {}".format(path, error))
55     return config
56
57 def setup_logging(path, level, **sublevels):
58     handler = logging.FileHandler(path)
59     handler.setFormatter(logging.Formatter(
60             '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
61             '%Y-%m-%d %H:%M:%S'))
62     root_logger = logging.getLogger()
63     root_logger.addHandler(handler)
64     root_logger.setLevel(level)
65     for logger_name, sublevel in sublevels.iteritems():
66         sublogger = logging.getLogger(logger_name)
67         sublogger.setLevel(sublevel)
68     return root_logger
69
70 def build_server_calculator(config):
71     cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
72     if not cloud_size_list:
73         abort("No valid node sizes configured")
74     return ServerCalculator(cloud_size_list,
75                             config.getint('Daemon', 'max_nodes'),
76                             config.getfloat('Daemon', 'max_total_price'),
77                             config.getfloat('Daemon', 'node_mem_scaling'))
78
79 def launch_pollers(config, server_calculator):
80     poll_time = config.getint('Daemon', 'poll_time')
81     max_poll_time = config.getint('Daemon', 'max_poll_time')
82
83     timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
84     cloud_node_poller = CloudNodeListMonitorActor.start(
85         config.new_cloud_client(), timer, server_calculator, poll_time, max_poll_time).tell_proxy()
86     arvados_node_poller = ArvadosNodeListMonitorActor.start(
87         config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
88     job_queue_poller = JobQueueMonitorActor.start(
89         config.new_arvados_client(), timer, server_calculator,
90         config.getboolean('Arvados', 'jobs_queue'),
91         config.getboolean('Arvados', 'slurm_queue'),
92         poll_time, max_poll_time
93     ).tell_proxy()
94     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
95
96 _caught_signals = {}
97 def shutdown_signal(signal_code, frame):
98     current_count = _caught_signals.get(signal_code, 0)
99     _caught_signals[signal_code] = current_count + 1
100     if node_daemon is None:
101         pykka.ActorRegistry.stop_all()
102         sys.exit(-signal_code)
103     elif current_count == 0:
104         watchdog.stop()
105         node_daemon.shutdown()
106     elif current_count == 1:
107         pykka.ActorRegistry.stop_all()
108     else:
109         sys.exit(-signal_code)
110
111 def main(args=None):
112     global node_daemon, watchdog
113     args = parse_cli(args)
114     config = load_config(args.config)
115
116     if not args.foreground:
117         daemon.DaemonContext().open()
118     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
119         signal.signal(sigcode, shutdown_signal)
120
121     status.Server(config).start()
122
123     try:
124         root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
125         root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
126         node_setup, node_shutdown, node_update, node_monitor = \
127             config.dispatch_classes()
128         server_calculator = build_server_calculator(config)
129         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
130             launch_pollers(config, server_calculator)
131         cloud_node_updater = node_update.start(config.new_cloud_client, timer).tell_proxy()
132         node_daemon = NodeManagerDaemonActor.start(
133             job_queue_poller, arvados_node_poller, cloud_node_poller,
134             cloud_node_updater, timer,
135             config.new_arvados_client, config.new_cloud_client,
136             config.shutdown_windows(),
137             server_calculator,
138             config.getint('Daemon', 'min_nodes'),
139             config.getint('Daemon', 'max_nodes'),
140             config.getint('Daemon', 'poll_stale_after'),
141             config.getint('Daemon', 'boot_fail_after'),
142             config.getint('Daemon', 'node_stale_after'),
143             node_setup, node_shutdown, node_monitor,
144             max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
145
146         watchdog = WatchdogActor.start(config.getint('Daemon', 'watchdog'),
147                             cloud_node_poller.actor_ref,
148                             arvados_node_poller.actor_ref,
149                             job_queue_poller.actor_ref,
150                             node_daemon.actor_ref)
151
152         signal.pause()
153         daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
154         while not daemon_stopped():
155             time.sleep(1)
156     except Exception:
157         logging.exception("Uncaught exception during setup")
158     finally:
159         pykka.ActorRegistry.stop_all()
160
161
162 if __name__ == '__main__':
163     main()