4293: Node Manager shuts down nodes that fail to boot.
authorBrett Smith <brett@curoverse.com>
Fri, 5 Dec 2014 22:27:37 +0000 (17:27 -0500)
committerBrett Smith <brett@curoverse.com>
Wed, 10 Dec 2014 13:03:50 +0000 (08:03 -0500)
This helps Node Manager detect and correct when a node fails to
bootstrap.

services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/config.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/doc/ec2.example.cfg
services/nodemanager/tests/test_computenode_dispatch.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/testutil.py

index ae0a65b9317e86f8d53afa00ba7b9483f047c4aa..c79d8f9588fbea18e276fd1e8f30474b3ac7677e 100644 (file)
@@ -136,12 +136,18 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
     This actor simply destroys a cloud node, retrying as needed.
     """
     def __init__(self, timer_actor, cloud_client, node_monitor,
-                 retry_wait=1, max_retry_wait=180):
+                 cancellable=True, retry_wait=1, max_retry_wait=180):
+        # If a ShutdownActor is cancellable, it will ask the
+        # ComputeNodeMonitorActor if it's still eligible before taking each
+        # action, and stop the shutdown process if the node is no longer
+        # eligible.  Normal shutdowns based on job demand should be
+        # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
             'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
         self._cloud = cloud_client
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
+        self.cancellable = cancellable
         self.success = None
 
     def on_start(self):
@@ -154,7 +160,8 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
     def _stop_if_window_closed(orig_func):
         @functools.wraps(orig_func)
         def wrapper(self, *args, **kwargs):
-            if not self._monitor.shutdown_eligible().get():
+            if (self.cancellable and
+                  (not self._monitor.shutdown_eligible().get())):
                 self._logger.info(
                     "Cloud node %s shutdown cancelled - no longer eligible.",
                     self.cloud_node.id)
index 079e623c559e477ec37b0435a60f3574cdbefacb..f018015717c150b0065212248f35bbfbf3d1d4a7 100644 (file)
@@ -6,6 +6,7 @@ import ConfigParser
 import importlib
 import logging
 import ssl
+import sys
 
 import arvados
 import httplib2
@@ -42,6 +43,7 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                        'poll_time': '60',
                        'max_poll_time': '300',
                        'poll_stale_after': '600',
+                       'boot_fail_after': str(sys.maxint),
                        'node_stale_after': str(60 * 60 * 2)},
             'Logging': {'file': '/dev/stderr',
                         'level': 'WARNING'},
index 9f22568faafbb1c45d9625816fb1c0027226882a..d03d1450bbe67bfdb0184293010e987b14e2d6fc 100644 (file)
@@ -26,14 +26,16 @@ class _BaseNodeTracker(object):
         self.nodes = {}
         self.orphans = {}
 
-    def __getitem__(self, key):
-        return self.nodes[key]
-
-    def __len__(self):
-        return len(self.nodes)
+    # Proxy the methods listed below to self.nodes.
+    def _proxy_method(name):
+        method = getattr(dict, name)
+        @functools.wraps(method, ('__name__', '__doc__'))
+        def wrapper(self, *args, **kwargs):
+            return method(self.nodes, *args, **kwargs)
+        return wrapper
 
-    def get(self, key, default=None):
-        return self.nodes.get(key, default)
+    for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
+        locals()[_method_name] = _proxy_method(_method_name)
 
     def record_key(self, record):
         return self.item_key(getattr(record, self.RECORD_ATTR))
@@ -96,7 +98,9 @@ class NodeManagerDaemonActor(actor_class):
                  cloud_nodes_actor, cloud_update_actor, timer_actor,
                  arvados_factory, cloud_factory,
                  shutdown_windows, min_nodes, max_nodes,
-                 poll_stale_after=600, node_stale_after=7200,
+                 poll_stale_after=600,
+                 boot_fail_after=1800,
+                 node_stale_after=7200,
                  node_setup_class=dispatch.ComputeNodeSetupActor,
                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
                  node_actor_class=dispatch.ComputeNodeMonitorActor):
@@ -115,6 +119,7 @@ class NodeManagerDaemonActor(actor_class):
         self.min_nodes = min_nodes
         self.max_nodes = max_nodes
         self.poll_stale_after = poll_stale_after
+        self.boot_fail_after = boot_fail_after
         self.node_stale_after = node_stale_after
         self.last_polls = {}
         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
@@ -269,15 +274,15 @@ class NodeManagerDaemonActor(actor_class):
         return pykka.get_all([getattr(actor, name) for name in attr_names])
 
     def node_up(self, setup_proxy):
-        cloud_node, arvados_node = self._get_actor_attrs(
-            setup_proxy, 'cloud_node', 'arvados_node')
+        cloud_node = setup_proxy.cloud_node.get()
         del self.booting[setup_proxy.actor_ref.actor_urn]
         setup_proxy.stop()
         record = self.cloud_nodes.get(cloud_node.id)
         if record is None:
             record = self._new_node(cloud_node)
             self.booted[cloud_node.id] = record
-        self._pair_nodes(record, arvados_node)
+        self._timer.schedule(time.time() + self.boot_fail_after,
+                             self._later.shutdown_unpaired_node, cloud_node.id)
 
     @_check_poll_freshness
     def stop_booting_node(self):
@@ -292,19 +297,31 @@ class NodeManagerDaemonActor(actor_class):
                     self._later.stop_booting_node()
                 break
 
-    @_check_poll_freshness
-    def node_can_shutdown(self, node_actor):
-        if self._nodes_excess() < 1:
-            return None
+    def _begin_node_shutdown(self, node_actor, cancellable):
         cloud_node_id = node_actor.cloud_node.get().id
         if cloud_node_id in self.shutdowns:
             return None
         shutdown = self._node_shutdown.start(
             timer_actor=self._timer, cloud_client=self._new_cloud(),
-            node_monitor=node_actor.actor_ref).proxy()
+            node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
         self.shutdowns[cloud_node_id] = shutdown
         shutdown.subscribe(self._later.node_finished_shutdown)
 
+    @_check_poll_freshness
+    def node_can_shutdown(self, node_actor):
+        if self._nodes_excess() > 0:
+            self._begin_node_shutdown(node_actor, cancellable=True)
+
+    def shutdown_unpaired_node(self, cloud_node_id):
+        for record_dict in [self.cloud_nodes, self.booted]:
+            if cloud_node_id in record_dict:
+                record = record_dict[cloud_node_id]
+                break
+        else:
+            return None
+        if record.arvados_node is None:
+            self._begin_node_shutdown(record.actor, cancellable=False)
+
     def node_finished_shutdown(self, shutdown_actor):
         success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
                                                     'cloud_node')
index 9f5e1627eaa1b1d808c87190dc1c40c2c15f5d3b..5fa404fcbbdf4df95d435ab2a73afbc2abe7bc97 100644 (file)
@@ -119,6 +119,7 @@ def main(args=None):
         config.getint('Daemon', 'min_nodes'),
         config.getint('Daemon', 'max_nodes'),
         config.getint('Daemon', 'poll_stale_after'),
+        config.getint('Daemon', 'boot_fail_after'),
         config.getint('Daemon', 'node_stale_after'),
         node_setup, node_shutdown, node_monitor).proxy()
 
index 0f9cacad55601b88316e7513155ab3790900559c..024ed2b59b3089676b520aec5212caa1caa470ba 100644 (file)
@@ -27,6 +27,12 @@ max_poll_time = 300
 # information is too outdated.
 poll_stale_after = 600
 
+# If Node Manager boots a cloud node, and it does not pair with an Arvados
+# node before this long, assume that there was a cloud bootstrap failure and
+# shut it down.  Note that normal shutdown windows apply (see the Cloud
+# section), so this should be shorter than the first shutdown window value.
+boot_fail_after = 1800
+
 # "Node stale time" affects two related behaviors.
 # 1. If a compute node has been running for at least this long, but it
 # isn't paired with an Arvados node, do not shut it down, but leave it alone.
index 7f6988dbe9df21dce1797698156f5a6e6390e517..c86dcfdca21d314f20b5e7586cd2988389fc32ae 100644 (file)
@@ -106,14 +106,14 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
         self.cloud_node = cloud_node
         self.arvados_node = arvados_node
 
-    def make_actor(self):
+    def make_actor(self, cancellable=True):
         if not hasattr(self, 'timer'):
             self.make_mocks()
         monitor_actor = dispatch.ComputeNodeMonitorActor.start(
             self.cloud_node, time.time(), self.shutdowns, self.timer,
             self.updates, self.arvados_node)
         self.shutdown_actor = self.ACTOR_CLASS.start(
-            self.timer, self.cloud_client, monitor_actor).proxy()
+            self.timer, self.cloud_client, monitor_actor, cancellable).proxy()
         self.monitor_actor = monitor_actor.proxy()
 
     def check_success_flag(self, expected, allow_msg_count=1):
@@ -126,6 +126,15 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
         else:
             self.fail("success flag {} is not {}".format(last_flag, expected))
 
+    def test_uncancellable_shutdown(self, *mocks):
+        self.make_mocks(shutdown_open=False)
+        self.cloud_client.destroy_node.return_value = False
+        self.make_actor(cancellable=False)
+        self.check_success_flag(None, 0)
+        self.shutdowns._set_state(True, 600)
+        self.cloud_client.destroy_node.return_value = True
+        self.check_success_flag(True)
+
 
 class ComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
                                        unittest.TestCase):
index 31a682ffd8b682be156ea71f563da39cc707ba3e..0f146a162a9ea6d585265249527ad7a99a774f82 100644 (file)
@@ -22,14 +22,14 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.cloud_factory = mock.MagicMock(name='cloud_mock')
         self.cloud_factory().node_start_time.return_value = time.time()
         self.cloud_updates = mock.MagicMock(name='updates_mock')
-        self.timer = testutil.MockTimer()
+        self.timer = testutil.MockTimer(deliver_immediately=False)
         self.node_setup = mock.MagicMock(name='setup_mock')
         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
             self.server_wishlist_poller, self.arvados_nodes_poller,
             self.cloud_nodes_poller, self.cloud_updates, self.timer,
             self.arv_factory, self.cloud_factory,
-            [54, 5, 1], min_nodes, max_nodes, 600, 3600,
+            [54, 5, 1], min_nodes, max_nodes, 600, 1800, 3600,
             self.node_setup, self.node_shutdown).proxy()
         if cloud_nodes is not None:
             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
@@ -44,6 +44,12 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def alive_monitor_count(self):
         return sum(1 for actor in self.monitor_list() if actor.is_alive())
 
+    def assertShutdownCancellable(self, expected=True):
+        self.assertTrue(self.node_shutdown.start.called)
+        self.assertIs(expected,
+                      self.node_shutdown.start.call_args[1]['cancellable'],
+                      "ComputeNodeShutdownActor incorrectly cancellable")
+
     def test_easy_node_creation(self):
         size = testutil.MockSize(1)
         self.make_daemon(want_sizes=[size])
@@ -195,8 +201,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         monitor = self.monitor_list()[0].proxy()
         self.daemon.update_server_wishlist([])
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
-        self.assertTrue(self.node_shutdown.start.called,
-                        "daemon did not shut down booted node on offer")
+        self.assertShutdownCancellable(True)
         shutdown = self.node_shutdown.start().proxy()
         shutdown.cloud_node.get.return_value = cloud_node
         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
@@ -210,6 +215,37 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertTrue(self.node_setup.start.called,
                         "second node not started after booted node stopped")
 
+    def test_booted_node_shut_down_when_never_listed(self):
+        setup = self.start_node_boot()
+        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.assertEqual(1, self.alive_monitor_count())
+        self.assertFalse(self.node_shutdown.start.called)
+        self.timer.deliver()
+        self.stop_proxy(self.daemon)
+        self.assertShutdownCancellable(False)
+
+    def test_booted_node_shut_down_when_never_paired(self):
+        cloud_node = testutil.cloud_node_mock(2)
+        setup = self.start_node_boot(cloud_node)
+        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.assertEqual(1, self.alive_monitor_count())
+        self.daemon.update_cloud_nodes([cloud_node])
+        self.timer.deliver()
+        self.stop_proxy(self.daemon)
+        self.assertShutdownCancellable(False)
+
+    def test_node_that_pairs_not_considered_failed_boot(self):
+        cloud_node = testutil.cloud_node_mock(3)
+        arv_node = testutil.arvados_node_mock(3)
+        setup = self.start_node_boot(cloud_node, arv_node)
+        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.assertEqual(1, self.alive_monitor_count())
+        self.daemon.update_cloud_nodes([cloud_node])
+        self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+        self.timer.deliver()
+        self.stop_proxy(self.daemon)
+        self.assertFalse(self.node_shutdown.start.called)
+
     def test_booting_nodes_shut_down(self):
         self.make_daemon(want_sizes=[testutil.MockSize(1)])
         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
@@ -317,6 +353,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertTrue(new_node.stop_if_no_cloud_node.called)
         self.daemon.node_up(new_node).get(self.TIMEOUT)
         self.assertTrue(new_node.stop.called)
+        self.timer.deliver()
         self.assertTrue(
             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
 
@@ -325,5 +362,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.make_daemon(want_sizes=[size])
         self.daemon.shutdown().get(self.TIMEOUT)
         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
+        self.timer.deliver()
         self.stop_proxy(self.daemon)
         self.assertEqual(1, self.node_setup.start.call_count)
index 56f22c8e08fc66f69c90422e30bb116b372dfbf6..30808ac73816e9056d6ee8c91025305e7570520e 100644 (file)
@@ -2,6 +2,7 @@
 
 from __future__ import absolute_import, print_function
 
+import threading
 import time
 
 import mock
@@ -62,8 +63,23 @@ class MockSize(object):
 
 
 class MockTimer(object):
+    def __init__(self, deliver_immediately=True):
+        self.deliver_immediately = deliver_immediately
+        self.messages = []
+        self.lock = threading.Lock()
+
+    def deliver(self):
+        with self.lock:
+            to_deliver = self.messages
+            self.messages = []
+        for callback, args, kwargs in to_deliver:
+            callback(*args, **kwargs)
+
     def schedule(self, want_time, callback, *args, **kwargs):
-        return callback(*args, **kwargs)
+        with self.lock:
+            self.messages.append((callback, args, kwargs))
+        if self.deliver_immediately:
+            self.deliver()
 
 
 class ActorTestMixin(object):