13937: Export stats as prometheus metrics. (WIP)
[arvados.git] / services / nodemanager / arvnodeman / launcher.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import argparse
9 import logging
10 import signal
11 import sys
12 import time
13
14 import daemon
15 import pykka
16 import libcloud
17
18 from . import config as nmconfig
19 from . import status
20 from .baseactor import WatchdogActor
21 from .daemon import NodeManagerDaemonActor
22 from .jobqueue import JobQueueMonitorActor, ServerCalculator
23 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
24 from .timedcallback import TimedCallBackActor
25 from ._version import __version__
26
27 node_daemon = None
28 watchdog = None
29
30 def abort(msg, code=1):
31     print("arvados-node-manager: " + msg)
32     sys.exit(code)
33
34 def parse_cli(args):
35     parser = argparse.ArgumentParser(
36         prog='arvados-node-manager',
37         description="Dynamically allocate Arvados cloud compute nodes")
38     parser.add_argument(
39         '--version', action='version',
40         version="%s %s" % (sys.argv[0], __version__),
41         help='Print version and exit.')
42     parser.add_argument(
43         '--foreground', action='store_true', default=False,
44         help="Run in the foreground.  Don't daemonize.")
45     parser.add_argument(
46         '--config', help="Path to configuration file")
47     return parser.parse_args(args)
48
49 def load_config(path):
50     if not path:
51         abort("No --config file specified", 2)
52     config = nmconfig.NodeManagerConfig()
53     try:
54         with open(path) as config_file:
55             config.readfp(config_file)
56     except (IOError, OSError) as error:
57         abort("Error reading configuration file {}: {}".format(path, error))
58     return config
59
60 def setup_logging(path, level, **sublevels):
61     handler = logging.FileHandler(path)
62     handler.setFormatter(logging.Formatter(
63             '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
64             '%Y-%m-%d %H:%M:%S'))
65     root_logger = logging.getLogger()
66     root_logger.addHandler(handler)
67     root_logger.setLevel(level)
68     for logger_name, sublevel in sublevels.iteritems():
69         sublogger = logging.getLogger(logger_name)
70         sublogger.setLevel(sublevel)
71     return root_logger
72
73 def build_server_calculator(config):
74     cloud_size_list = config.node_sizes()
75     if not cloud_size_list:
76         abort("No valid node sizes configured")
77     return ServerCalculator(cloud_size_list,
78                             config.getint('Daemon', 'max_nodes'),
79                             config.getfloat('Daemon', 'max_total_price'),
80                             config.getfloat('Daemon', 'node_mem_scaling'))
81
82 def launch_pollers(config, server_calculator):
83     poll_time = config.getfloat('Daemon', 'poll_time')
84     max_poll_time = config.getint('Daemon', 'max_poll_time')
85
86     cloudlist_poll_time = config.getfloat('Daemon', 'cloudlist_poll_time') or poll_time
87     nodelist_poll_time = config.getfloat('Daemon', 'nodelist_poll_time') or poll_time
88     wishlist_poll_time = config.getfloat('Daemon', 'wishlist_poll_time') or poll_time
89
90     timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
91     cloud_node_poller = CloudNodeListMonitorActor.start(
92         config.new_cloud_client(), timer, server_calculator, cloudlist_poll_time, max_poll_time).tell_proxy()
93     arvados_node_poller = ArvadosNodeListMonitorActor.start(
94         config.new_arvados_client(), timer, nodelist_poll_time, max_poll_time).tell_proxy()
95     job_queue_poller = JobQueueMonitorActor.start(
96         config.new_arvados_client(), timer, server_calculator,
97         config.getboolean('Arvados', 'jobs_queue'),
98         config.getboolean('Arvados', 'slurm_queue'),
99         wishlist_poll_time, max_poll_time
100     ).tell_proxy()
101     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
102
103 _caught_signals = {}
104 def shutdown_signal(signal_code, frame):
105     current_count = _caught_signals.get(signal_code, 0)
106     _caught_signals[signal_code] = current_count + 1
107     if node_daemon is None:
108         pykka.ActorRegistry.stop_all()
109         sys.exit(-signal_code)
110     elif current_count == 0:
111         watchdog.stop()
112         node_daemon.shutdown()
113     elif current_count == 1:
114         pykka.ActorRegistry.stop_all()
115     else:
116         sys.exit(-signal_code)
117
118 def main(args=None):
119     global node_daemon, watchdog
120     args = parse_cli(args)
121     config = load_config(args.config)
122
123     if not args.foreground:
124         daemon.DaemonContext().open()
125     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
126         signal.signal(sigcode, shutdown_signal)
127
128     status.Server(config).start()
129
130     try:
131         root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
132         root_logger.info("%s %s started, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
133         node_setup, node_shutdown, node_update, node_monitor = \
134             config.dispatch_classes()
135         server_calculator = build_server_calculator(config)
136         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
137             launch_pollers(config, server_calculator)
138         cloud_node_updater = node_update.start(config.new_cloud_client, timer).tell_proxy()
139         node_daemon = NodeManagerDaemonActor.start(
140             job_queue_poller, arvados_node_poller, cloud_node_poller,
141             cloud_node_updater, timer,
142             config.new_arvados_client, config.new_cloud_client,
143             config.shutdown_windows(),
144             server_calculator,
145             config.getint('Daemon', 'min_nodes'),
146             config.getint('Daemon', 'max_nodes'),
147             config.getint('Daemon', 'poll_stale_after'),
148             config.getint('Daemon', 'boot_fail_after'),
149             config.getint('Daemon', 'node_stale_after'),
150             node_setup, node_shutdown, node_monitor,
151             max_total_price=config.getfloat('Daemon', 'max_total_price'),
152             consecutive_idle_count=config.getint('Daemon', 'consecutive_idle_count'),).tell_proxy()
153
154         watchdog = WatchdogActor.start(config.getint('Daemon', 'watchdog'),
155                             cloud_node_poller.actor_ref,
156                             arvados_node_poller.actor_ref,
157                             job_queue_poller.actor_ref,
158                             node_daemon.actor_ref)
159
160         signal.pause()
161         daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
162         while not daemon_stopped():
163             time.sleep(1)
164     except Exception:
165         logging.exception("Uncaught exception during setup")
166     finally:
167         pykka.ActorRegistry.stop_all()
168
169
170 if __name__ == '__main__':
171     main()