X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a0e70cf1da033f7d94e728ab919bd8cfcabf3743..d5ddfd9d876a75327795793544d105051f2a306e:/services/nodemanager/arvnodeman/launcher.py diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py index d2f4afee06..9c45318fdc 100644 --- a/services/nodemanager/arvnodeman/launcher.py +++ b/services/nodemanager/arvnodeman/launcher.py @@ -10,13 +10,15 @@ import time import daemon import pykka +import libcloud from . import config as nmconfig -from .computenode.dispatch import ComputeNodeUpdateActor +from .baseactor import WatchdogActor from .daemon import NodeManagerDaemonActor from .jobqueue import JobQueueMonitorActor, ServerCalculator from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor from .timedcallback import TimedCallBackActor +from ._version import __version__ node_daemon = None @@ -28,6 +30,10 @@ def parse_cli(args): parser = argparse.ArgumentParser( prog='arvados-node-manager', description="Dynamically allocate Arvados cloud compute nodes") + parser.add_argument( + '--version', action='version', + version="%s %s" % (sys.argv[0], __version__), + help='Print version and exit.') parser.add_argument( '--foreground', action='store_true', default=False, help="Run in the foreground. Don't daemonize.") @@ -57,29 +63,28 @@ def setup_logging(path, level, **sublevels): for logger_name, sublevel in sublevels.iteritems(): sublogger = logging.getLogger(logger_name) sublogger.setLevel(sublevel) + return root_logger -def launch_pollers(config): - cloud_client = config.new_cloud_client() - arvados_client = config.new_arvados_client() - cloud_size_list = config.node_sizes(cloud_client.list_sizes()) +def build_server_calculator(config): + cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes()) if not cloud_size_list: abort("No valid node sizes configured") + return ServerCalculator(cloud_size_list, + config.getint('Daemon', 'max_nodes'), + config.getfloat('Daemon', 'max_total_price')) - server_calculator = ServerCalculator( - cloud_size_list, - config.getint('Daemon', 'min_nodes'), - config.getint('Daemon', 'max_nodes')) +def launch_pollers(config, server_calculator): poll_time = config.getint('Daemon', 'poll_time') max_poll_time = config.getint('Daemon', 'max_poll_time') - timer = TimedCallBackActor.start(poll_time / 10.0).proxy() + timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy() cloud_node_poller = CloudNodeListMonitorActor.start( - cloud_client, timer, poll_time, max_poll_time).proxy() + config.new_cloud_client(), timer, server_calculator, poll_time, max_poll_time).tell_proxy() arvados_node_poller = ArvadosNodeListMonitorActor.start( - arvados_client, timer, poll_time, max_poll_time).proxy() + config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy() job_queue_poller = JobQueueMonitorActor.start( config.new_arvados_client(), timer, server_calculator, - poll_time, max_poll_time).proxy() + poll_time, max_poll_time).tell_proxy() return timer, cloud_node_poller, arvados_node_poller, job_queue_poller _caught_signals = {} @@ -106,26 +111,43 @@ def main(args=None): for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]: signal.signal(sigcode, shutdown_signal) - setup_logging(config.get('Logging', 'file'), **config.log_levels()) - timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \ - launch_pollers(config) - cloud_node_updater = ComputeNodeUpdateActor.start( - config.new_cloud_client).proxy() - node_daemon = NodeManagerDaemonActor.start( - job_queue_poller, arvados_node_poller, cloud_node_poller, - cloud_node_updater, timer, - config.new_arvados_client, config.new_cloud_client, - config.shutdown_windows(), - config.getint('Daemon', 'min_nodes'), - config.getint('Daemon', 'max_nodes'), - config.getint('Daemon', 'poll_stale_after'), - config.getint('Daemon', 'node_stale_after')).proxy() - - signal.pause() - daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set - while not daemon_stopped(): - time.sleep(1) - pykka.ActorRegistry.stop_all() + try: + root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels()) + root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__) + node_setup, node_shutdown, node_update, node_monitor = \ + config.dispatch_classes() + server_calculator = build_server_calculator(config) + timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \ + launch_pollers(config, server_calculator) + cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy() + node_daemon = NodeManagerDaemonActor.start( + job_queue_poller, arvados_node_poller, cloud_node_poller, + cloud_node_updater, timer, + config.new_arvados_client, config.new_cloud_client, + config.shutdown_windows(), + server_calculator, + config.getint('Daemon', 'min_nodes'), + config.getint('Daemon', 'max_nodes'), + config.getint('Daemon', 'poll_stale_after'), + config.getint('Daemon', 'boot_fail_after'), + config.getint('Daemon', 'node_stale_after'), + node_setup, node_shutdown, node_monitor, + max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy() + + WatchdogActor.start(config.getint('Daemon', 'watchdog'), + cloud_node_poller.actor_ref, + arvados_node_poller.actor_ref, + job_queue_poller.actor_ref, + node_daemon.actor_ref) + + signal.pause() + daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set + while not daemon_stopped(): + time.sleep(1) + except Exception: + logging.exception("Uncaught exception during setup") + finally: + pykka.ActorRegistry.stop_all() if __name__ == '__main__':