Merge branch '8784-dir-listings'
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import argparse
9 import logging
10 import signal
11 import sys
12 import time
13
14 import daemon
15 import pykka
16 import libcloud
17
18 from . import config as nmconfig
19 from . import status
20 from .baseactor import WatchdogActor
21 from .daemon import NodeManagerDaemonActor
22 from .jobqueue import JobQueueMonitorActor, ServerCalculator
23 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
24 from .timedcallback import TimedCallBackActor
25 from ._version import __version__
26
27 node_daemon = None
28 watchdog = None
29
30 def abort(msg, code=1):
31     print("arvados-node-manager: " + msg)
32     sys.exit(code)
33
34 def parse_cli(args):
35     parser = argparse.ArgumentParser(
36         prog='arvados-node-manager',
37         description="Dynamically allocate Arvados cloud compute nodes")
38     parser.add_argument(
39         '--version', action='version',
40         version="%s %s" % (sys.argv[0], __version__),
41         help='Print version and exit.')
42     parser.add_argument(
43         '--foreground', action='store_true', default=False,
44         help="Run in the foreground.  Don't daemonize.")
45     parser.add_argument(
46         '--config', help="Path to configuration file")
47     return parser.parse_args(args)
48
49 def load_config(path):
50     if not path:
51         abort("No --config file specified", 2)
52     config = nmconfig.NodeManagerConfig()
53     try:
54         with open(path) as config_file:
55             config.readfp(config_file)
56     except (IOError, OSError) as error:
57         abort("Error reading configuration file {}: {}".format(path, error))
58     return config
59
60 def setup_logging(path, level, **sublevels):
61     handler = logging.FileHandler(path)
62     handler.setFormatter(logging.Formatter(
63             '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
64             '%Y-%m-%d %H:%M:%S'))
65     root_logger = logging.getLogger()
66     root_logger.addHandler(handler)
67     root_logger.setLevel(level)
68     for logger_name, sublevel in sublevels.iteritems():
69         sublogger = logging.getLogger(logger_name)
70         sublogger.setLevel(sublevel)
71     return root_logger
72
73 def build_server_calculator(config):
74     cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
75     if not cloud_size_list:
76         abort("No valid node sizes configured")
77     return ServerCalculator(cloud_size_list,
78                             config.getint('Daemon', 'max_nodes'),
79                             config.getfloat('Daemon', 'max_total_price'),
80                             config.getfloat('Daemon', 'node_mem_scaling'))
81
82 def launch_pollers(config, server_calculator):
83     poll_time = config.getint('Daemon', 'poll_time')
84     max_poll_time = config.getint('Daemon', 'max_poll_time')
85
86     timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
87     cloud_node_poller = CloudNodeListMonitorActor.start(
88         config.new_cloud_client(), timer, server_calculator, poll_time, max_poll_time).tell_proxy()
89     arvados_node_poller = ArvadosNodeListMonitorActor.start(
90         config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
91     job_queue_poller = JobQueueMonitorActor.start(
92         config.new_arvados_client(), timer, server_calculator,
93         config.getboolean('Arvados', 'jobs_queue'),
94         config.getboolean('Arvados', 'slurm_queue'),
95         poll_time, max_poll_time
96     ).tell_proxy()
97     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
98
99 _caught_signals = {}
100 def shutdown_signal(signal_code, frame):
101     current_count = _caught_signals.get(signal_code, 0)
102     _caught_signals[signal_code] = current_count + 1
103     if node_daemon is None:
104         pykka.ActorRegistry.stop_all()
105         sys.exit(-signal_code)
106     elif current_count == 0:
107         watchdog.stop()
108         node_daemon.shutdown()
109     elif current_count == 1:
110         pykka.ActorRegistry.stop_all()
111     else:
112         sys.exit(-signal_code)
113
114 def main(args=None):
115     global node_daemon, watchdog
116     args = parse_cli(args)
117     config = load_config(args.config)
118
119     if not args.foreground:
120         daemon.DaemonContext().open()
121     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
122         signal.signal(sigcode, shutdown_signal)
123
124     status.Server(config).start()
125
126     try:
127         root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
128         root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
129         node_setup, node_shutdown, node_update, node_monitor = \
130             config.dispatch_classes()
131         server_calculator = build_server_calculator(config)
132         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
133             launch_pollers(config, server_calculator)
134         cloud_node_updater = node_update.start(config.new_cloud_client, timer).tell_proxy()
135         node_daemon = NodeManagerDaemonActor.start(
136             job_queue_poller, arvados_node_poller, cloud_node_poller,
137             cloud_node_updater, timer,
138             config.new_arvados_client, config.new_cloud_client,
139             config.shutdown_windows(),
140             server_calculator,
141             config.getint('Daemon', 'min_nodes'),
142             config.getint('Daemon', 'max_nodes'),
143             config.getint('Daemon', 'poll_stale_after'),
144             config.getint('Daemon', 'boot_fail_after'),
145             config.getint('Daemon', 'node_stale_after'),
146             node_setup, node_shutdown, node_monitor,
147             max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
148
149         watchdog = WatchdogActor.start(config.getint('Daemon', 'watchdog'),
150                             cloud_node_poller.actor_ref,
151                             arvados_node_poller.actor_ref,
152                             job_queue_poller.actor_ref,
153                             node_daemon.actor_ref)
154
155         signal.pause()
156         daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
157         while not daemon_stopped():
158             time.sleep(1)
159     except Exception:
160         logging.exception("Uncaught exception during setup")
161     finally:
162         pykka.ActorRegistry.stop_all()
163
164
165 if __name__ == '__main__':
166     main()