From ffc7c1b20530dda13c1adf3aa48122ac881bbc29 Mon Sep 17 00:00:00 2001 From: Brett Smith Date: Thu, 13 Nov 2014 15:34:54 -0500 Subject: [PATCH] 4380: Add SLURM dispatcher to Node Manager. --- .../arvnodeman/computenode/dispatch/slurm.py | 49 ++++++++++++++++ services/nodemanager/arvnodeman/config.py | 11 ++++ services/nodemanager/arvnodeman/launcher.py | 9 +-- services/nodemanager/doc/ec2.example.cfg | 5 ++ .../tests/test_computenode_dispatch.py | 10 +++- .../tests/test_computenode_dispatch_slurm.py | 58 +++++++++++++++++++ services/nodemanager/tests/test_config.py | 18 ++++++ 7 files changed, 153 insertions(+), 7 deletions(-) create mode 100644 services/nodemanager/arvnodeman/computenode/dispatch/slurm.py create mode 100644 services/nodemanager/tests/test_computenode_dispatch_slurm.py diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py new file mode 100644 index 0000000000..27397e5d50 --- /dev/null +++ b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import subprocess +import time + +from . import \ + ComputeNodeSetupActor, ComputeNodeUpdateActor, ComputeNodeMonitorActor +from . import ComputeNodeShutdownActor as ShutdownActorBase + +class ComputeNodeShutdownActor(ShutdownActorBase): + def on_start(self): + arv_node = self._monitor.arvados_node.get() + if arv_node is None: + return super(ComputeNodeShutdownActor, self).on_start() + else: + self._nodename = arv_node['hostname'] + self._logger.info("Draining SLURM node %s", self._nodename) + self._later.issue_slurm_drain() + + def _set_node_state(self, state, *args): + cmd = ['scontrol', 'update', 'NodeName=' + self._nodename, + 'State=' + state] + cmd.extend(args) + subprocess.check_output(cmd) + + @ShutdownActorBase._retry((subprocess.CalledProcessError,)) + def cancel_shutdown(self): + self._set_node_state('RESUME') + return super(ComputeNodeShutdownActor, self).cancel_shutdown() + + @ShutdownActorBase._stop_if_window_closed + @ShutdownActorBase._retry((subprocess.CalledProcessError,)) + def issue_slurm_drain(self): + self._set_node_state('DRAIN', 'Reason=Node Manager shutdown') + self._logger.info("Waiting for SLURM node %s to drain", self._nodename) + self._later.await_slurm_drain() + + @ShutdownActorBase._stop_if_window_closed + @ShutdownActorBase._retry((subprocess.CalledProcessError,)) + def await_slurm_drain(self): + output = subprocess.check_output( + ['sinfo', '--noheader', '-o', '%t', '-n', self._nodename]) + if output == 'drain\n': + self._later.shutdown_node() + else: + self._timer.schedule(time.time() + 10, + self._later.await_slurm_drain) diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py index 24fd828cf5..079e623c55 100644 --- a/services/nodemanager/arvnodeman/config.py +++ b/services/nodemanager/arvnodeman/config.py @@ -68,6 +68,17 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser): for key in self.options('Logging') if key not in self.LOGGING_NONLEVELS} + def dispatch_classes(self): + mod_name = 'arvnodeman.computenode.dispatch' + if self.has_option('Daemon', 'dispatcher'): + mod_name = '{}.{}'.format(mod_name, + self.get('Daemon', 'dispatcher')) + module = importlib.import_module(mod_name) + return (module.ComputeNodeSetupActor, + module.ComputeNodeShutdownActor, + module.ComputeNodeUpdateActor, + module.ComputeNodeMonitorActor) + def new_arvados_client(self): if self.has_option('Daemon', 'certs_file'): certs_file = self.get('Daemon', 'certs_file') diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py index d2f4afee06..9f5e1627ea 100644 --- a/services/nodemanager/arvnodeman/launcher.py +++ b/services/nodemanager/arvnodeman/launcher.py @@ -12,7 +12,6 @@ import daemon import pykka from . import config as nmconfig -from .computenode.dispatch import ComputeNodeUpdateActor from .daemon import NodeManagerDaemonActor from .jobqueue import JobQueueMonitorActor, ServerCalculator from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor @@ -107,10 +106,11 @@ def main(args=None): signal.signal(sigcode, shutdown_signal) setup_logging(config.get('Logging', 'file'), **config.log_levels()) + node_setup, node_shutdown, node_update, node_monitor = \ + config.dispatch_classes() timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \ launch_pollers(config) - cloud_node_updater = ComputeNodeUpdateActor.start( - config.new_cloud_client).proxy() + cloud_node_updater = node_update.start(config.new_cloud_client).proxy() node_daemon = NodeManagerDaemonActor.start( job_queue_poller, arvados_node_poller, cloud_node_poller, cloud_node_updater, timer, @@ -119,7 +119,8 @@ def main(args=None): config.getint('Daemon', 'min_nodes'), config.getint('Daemon', 'max_nodes'), config.getint('Daemon', 'poll_stale_after'), - config.getint('Daemon', 'node_stale_after')).proxy() + config.getint('Daemon', 'node_stale_after'), + node_setup, node_shutdown, node_monitor).proxy() signal.pause() daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set diff --git a/services/nodemanager/doc/ec2.example.cfg b/services/nodemanager/doc/ec2.example.cfg index f4b27af701..0f9cacad55 100644 --- a/services/nodemanager/doc/ec2.example.cfg +++ b/services/nodemanager/doc/ec2.example.cfg @@ -2,6 +2,11 @@ # All times are in seconds unless specified otherwise. [Daemon] +# The dispatcher can customize the start and stop procedure for +# cloud nodes. For example, the SLURM dispatcher drains nodes +# through SLURM before shutting them down. +#dispatcher = slurm + # Node Manager will ensure that there are at least this many nodes # running at all times. min_nodes = 0 diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py index b8239f33ff..7f6988dbe9 100644 --- a/services/nodemanager/tests/test_computenode_dispatch.py +++ b/services/nodemanager/tests/test_computenode_dispatch.py @@ -93,8 +93,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase): subscriber.call_args[0][0].actor_ref.actor_urn) -class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin, - unittest.TestCase): +class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin): def make_mocks(self, cloud_node=None, arvados_node=None, shutdown_open=True): self.timer = testutil.MockTimer() @@ -113,7 +112,7 @@ class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin, monitor_actor = dispatch.ComputeNodeMonitorActor.start( self.cloud_node, time.time(), self.shutdowns, self.timer, self.updates, self.arvados_node) - self.shutdown_actor = dispatch.ComputeNodeShutdownActor.start( + self.shutdown_actor = self.ACTOR_CLASS.start( self.timer, self.cloud_client, monitor_actor).proxy() self.monitor_actor = monitor_actor.proxy() @@ -127,6 +126,11 @@ class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin, else: self.fail("success flag {} is not {}".format(last_flag, expected)) + +class ComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin, + unittest.TestCase): + ACTOR_CLASS = dispatch.ComputeNodeShutdownActor + def test_easy_shutdown(self): self.make_actor() self.check_success_flag(True) diff --git a/services/nodemanager/tests/test_computenode_dispatch_slurm.py b/services/nodemanager/tests/test_computenode_dispatch_slurm.py new file mode 100644 index 0000000000..ccac8b2449 --- /dev/null +++ b/services/nodemanager/tests/test_computenode_dispatch_slurm.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python + +from __future__ import absolute_import, print_function + +import subprocess +import unittest + +import mock + +import arvnodeman.computenode.dispatch.slurm as slurm_dispatch +from . import testutil +from .test_computenode_dispatch import ComputeNodeShutdownActorMixin + +@mock.patch('subprocess.check_output') +class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin, + unittest.TestCase): + ACTOR_CLASS = slurm_dispatch.ComputeNodeShutdownActor + + def check_slurm_got_args(self, proc_mock, *args): + self.assertTrue(proc_mock.called) + slurm_cmd = proc_mock.call_args[0][0] + for s in args: + self.assertIn(s, slurm_cmd) + + def check_success_after_reset(self, proc_mock): + self.make_mocks(arvados_node=testutil.arvados_node_mock(63)) + self.make_actor() + self.check_success_flag(None, 0) + self.check_success_flag(None, 0) + # Order is critical here: if the mock gets called when no return value + # or side effect is set, we may invoke a real subprocess. + proc_mock.return_value = 'drain\n' + proc_mock.side_effect = None + self.check_success_flag(True, 3) + self.check_slurm_got_args(proc_mock, 'compute63') + + def test_wait_for_drained_state(self, proc_mock): + proc_mock.return_value = 'drng\n' + self.check_success_after_reset(proc_mock) + + def test_retry_failed_slurm_calls(self, proc_mock): + proc_mock.side_effect = subprocess.CalledProcessError(1, ["mock"]) + self.check_success_after_reset(proc_mock) + + def test_slurm_bypassed_when_no_arvados_node(self, proc_mock): + # Test we correctly handle a node that failed to bootstrap. + proc_mock.return_value = 'idle\n' + self.make_actor() + self.check_success_flag(True) + self.assertFalse(proc_mock.called) + + def test_node_undrained_when_shutdown_window_closes(self, proc_mock): + proc_mock.return_value = 'alloc\n' + self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True)) + self.make_actor() + self.check_success_flag(False, 2) + self.check_slurm_got_args(proc_mock, 'NodeName=compute99', + 'State=RESUME') diff --git a/services/nodemanager/tests/test_config.py b/services/nodemanager/tests/test_config.py index 3aa95410c4..d43491e791 100644 --- a/services/nodemanager/tests/test_config.py +++ b/services/nodemanager/tests/test_config.py @@ -6,6 +6,8 @@ import io import logging import unittest +import arvnodeman.computenode.dispatch as dispatch +import arvnodeman.computenode.dispatch.slurm as slurm_dispatch import arvnodeman.config as nmconfig class NodeManagerConfigTestCase(unittest.TestCase): @@ -63,3 +65,19 @@ testlogger = INFO self.assertEqual({'level': logging.DEBUG, 'testlogger': logging.INFO}, config.log_levels()) + + def check_dispatch_classes(self, config, module): + setup, shutdown, update, monitor = config.dispatch_classes() + self.assertIs(setup, module.ComputeNodeSetupActor) + self.assertIs(shutdown, module.ComputeNodeShutdownActor) + self.assertIs(update, module.ComputeNodeUpdateActor) + self.assertIs(monitor, module.ComputeNodeMonitorActor) + + def test_default_dispatch(self): + config = self.load_config() + self.check_dispatch_classes(config, dispatch) + + def test_custom_dispatch(self): + config = self.load_config( + config_str=self.TEST_CONFIG + "[Daemon]\ndispatcher=slurm\n") + self.check_dispatch_classes(config, slurm_dispatch) -- 2.30.2