Merge branch 'master' into 13804-no-shutdown-wanted-nodes
[arvados.git] / services / nodemanager / tests / test_daemon.py
index 04ff9b6d79962922ea8a3327edc726db528b524e..1b6e4ca8da4aa24bfb45f8382e7b5d7700cd2bf2 100644 (file)
@@ -1,4 +1,7 @@
 #!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
 
 from __future__ import absolute_import, print_function
 
@@ -14,10 +17,33 @@ from arvnodeman.jobqueue import ServerCalculator
 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
 from . import testutil
 from . import test_status
+from . import pykka_timeout
 import logging
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                      unittest.TestCase):
+
+    def assertwait(self, f, timeout=pykka_timeout*2):
+        deadline = time.time() + timeout
+        while True:
+            try:
+                return f()
+            except AssertionError:
+                if time.time() > deadline:
+                    raise
+                pass
+            time.sleep(.1)
+            self.daemon.ping().get(self.TIMEOUT)
+
+    def busywait(self, f):
+        for n in xrange(200):
+            ok = f()
+            if ok:
+                return
+            time.sleep(.1)
+            self.daemon.ping().get(self.TIMEOUT)
+        self.assertTrue(ok) # always falsy, but not necessarily False
+
     def mock_node_start(self, **kwargs):
         # Make sure that every time the daemon starts a setup actor,
         # it gets a new mock object back.
@@ -65,8 +91,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
         self.arv_factory = mock.MagicMock(name='arvados_mock')
         api_client = mock.MagicMock(name='api_client')
-        api_client.nodes().create().execute.side_effect = [testutil.arvados_node_mock(1),
-                                                           testutil.arvados_node_mock(2)]
+        api_client.nodes().create().execute.side_effect = \
+            [testutil.arvados_node_mock(1),
+             testutil.arvados_node_mock(2)]
         self.arv_factory.return_value = api_client
 
         self.cloud_factory = mock.MagicMock(name='cloud_mock')
@@ -99,14 +126,16 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
 
     def monitor_list(self):
-        return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
+        return [c.actor.actor_ref for c in self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values() if c.actor]
 
-    def monitored_arvados_nodes(self):
+    def monitored_arvados_nodes(self, include_unpaired=True):
         pairings = []
         for future in [actor.proxy().arvados_node
                        for actor in self.monitor_list()]:
             try:
-                pairings.append(future.get(self.TIMEOUT))
+                g = future.get(self.TIMEOUT)
+                if g or include_unpaired:
+                    pairings.append(g)
             except pykka.ActorDeadError:
                 pass
         return pairings
@@ -114,6 +143,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def alive_monitor_count(self):
         return len(self.monitored_arvados_nodes())
 
+    def paired_monitor_count(self):
+        return len(self.monitored_arvados_nodes(False))
+
     def assertShutdownCancellable(self, expected=True):
         self.assertTrue(self.node_shutdown.start.called)
         self.assertIs(expected,
@@ -123,17 +155,16 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_easy_node_creation(self):
         size = testutil.MockSize(1)
         self.make_daemon(want_sizes=[size])
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.busywait(lambda: self.node_setup.start.called)
+        self.assertIn('node_quota', status.tracker._latest)
 
     def check_monitors_arvados_nodes(self, *arv_nodes):
-        self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
+        self.assertwait(lambda: self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes()))
 
     def test_node_pairing(self):
         cloud_node = testutil.cloud_node_mock(1)
         arv_node = testutil.arvados_node_mock(1)
         self.make_daemon([cloud_node], [arv_node])
-        self.stop_proxy(self.daemon)
         self.check_monitors_arvados_nodes(arv_node)
 
     def test_node_pairing_after_arvados_update(self):
@@ -142,7 +173,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          [testutil.arvados_node_mock(1, ip_address=None)])
         arv_node = testutil.arvados_node_mock(2)
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
         self.check_monitors_arvados_nodes(arv_node)
 
     def test_arvados_node_un_and_re_paired(self):
@@ -154,9 +184,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
         self.check_monitors_arvados_nodes(arv_node)
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.assertEqual(0, self.alive_monitor_count())
+        self.busywait(lambda: 0 == self.alive_monitor_count())
         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
-        self.stop_proxy(self.daemon)
         self.check_monitors_arvados_nodes(arv_node)
 
     def test_old_arvados_node_not_double_assigned(self):
@@ -176,8 +205,40 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_node_count_satisfied(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
                          want_sizes=[testutil.MockSize(1)])
-        self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called)
+        self.busywait(lambda: not self.node_setup.start.called)
+
+    def test_select_stale_node_records_with_slot_numbers_first(self):
+        """
+        Stale node records with slot_number assigned can exist when
+        clean_arvados_node() isn't executed after a node shutdown, for
+        various reasons.
+        NodeManagerDaemonActor should use these stale node records first, so
+        that they don't accumulate unused, reducing the slots available.
+        """
+        size = testutil.MockSize(1)
+        a_long_time_ago = '1970-01-01T01:02:03.04050607Z'
+        arvados_nodes = []
+        for n in range(9):
+            # Add several stale node records without slot_number assigned
+            arvados_nodes.append(
+                testutil.arvados_node_mock(
+                    n+1,
+                    slot_number=None,
+                    modified_at=a_long_time_ago))
+        # Add one record with stale_node assigned, it should be the
+        # first one selected
+        arv_node = testutil.arvados_node_mock(
+            123,
+            modified_at=a_long_time_ago)
+        arvados_nodes.append(arv_node)
+        cloud_node = testutil.cloud_node_mock(125, size=size)
+        self.make_daemon(cloud_nodes=[cloud_node],
+                         arvados_nodes=arvados_nodes)
+        arvados_nodes_tracker = self.daemon.arvados_nodes.get()
+        # Here, find_stale_node() should return the node record with
+        # the slot_number assigned.
+        self.assertEqual(arv_node,
+                         arvados_nodes_tracker.find_stale_node(3601))
 
     def test_dont_count_missing_as_busy(self):
         size = testutil.MockSize(1)
@@ -188,8 +249,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                             2,
                                             last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size, size])
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.busywait(lambda: 2 == self.alive_monitor_count())
+        self.busywait(lambda: self.node_setup.start.called)
 
     def test_missing_counts_towards_max(self):
         size = testutil.MockSize(1)
@@ -199,8 +260,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size, size],
                          max_nodes=2)
-        self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called)
+        self.busywait(lambda: not self.node_setup.start.called)
 
     def test_excess_counts_missing(self):
         size = testutil.MockSize(1)
@@ -209,7 +269,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-        self.assertEqual(2, self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
@@ -221,7 +281,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         get_cloud_node = mock.MagicMock(name="get_cloud_node")
         get_cloud_node.get.return_value = cloud_nodes[1]
         mock_node_monitor = mock.MagicMock()
@@ -230,10 +290,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
 
-        self.assertEqual(2, self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.alive_monitor_count()))
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
-        self.assertEqual(1, self.node_shutdown.start.call_count)
+        self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
 
     def test_booting_nodes_counted(self):
         cloud_node = testutil.cloud_node_mock(1)
@@ -243,17 +303,15 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.max_nodes.get(self.TIMEOUT)
         self.assertTrue(self.node_setup.start.called)
         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.node_setup.start.call_count)
+        self.busywait(lambda: 1 == self.node_setup.start.call_count)
 
     def test_boot_new_node_when_all_nodes_busy(self):
         size = testutil.MockSize(2)
         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
                          [size], avail_sizes=[(size, {"cores":1})])
-        self.busywait(lambda: self.node_setup.start.called)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
+        self.assertwait(lambda: self.assertEqual(1, self.node_setup.start.called))
 
     def test_boot_new_node_below_min_nodes(self):
         min_size = testutil.MockSize(1)
@@ -305,7 +363,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(4)
         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
         setup = self.start_node_boot(cloud_node, arv_node)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.daemon.update_arvados_nodes([arv_node])
         self.daemon.update_cloud_nodes([cloud_node])
@@ -322,13 +380,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(1)
         setup = self.start_node_boot(cloud_node)
         self.daemon.update_cloud_nodes([cloud_node])
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
 
     def test_no_duplication_when_booted_node_listed(self):
         cloud_node = testutil.cloud_node_mock(2)
         setup = self.start_node_boot(cloud_node, id_num=2)
-        self.daemon.node_up(setup)
+        self.daemon.node_setup_finished(setup)
         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
 
@@ -337,14 +395,14 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         # even it doesn't appear in the listing (e.g., because of delays
         # propagating tags).
         setup = self.start_node_boot()
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
 
     def test_booted_unlisted_node_counted(self):
         setup = self.start_node_boot(id_num=1)
-        self.daemon.node_up(setup)
+        self.daemon.node_setup_finished(setup)
         self.daemon.update_server_wishlist(
             [testutil.MockSize(1)]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -352,7 +410,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
     def test_booted_node_can_shutdown(self):
         setup = self.start_node_boot()
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.update_server_wishlist([])
@@ -370,7 +428,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_booted_node_lifecycle(self):
         cloud_node = testutil.cloud_node_mock(6)
         setup = self.start_node_boot(cloud_node, id_num=6)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.update_server_wishlist([])
@@ -390,28 +448,47 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertTrue(self.node_setup.start.called,
                         "second node not started after booted node stopped")
 
+    def test_node_disappearing_during_shutdown(self):
+        cloud_node = testutil.cloud_node_mock(6)
+        setup = self.start_node_boot(cloud_node, id_num=6)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
+        self.assertEqual(1, self.alive_monitor_count())
+        monitor = self.monitor_list()[0].proxy()
+        self.daemon.update_server_wishlist([])
+        self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.assertShutdownCancellable(True)
+        shutdown = self.node_shutdown.start().proxy()
+        shutdown.cloud_node.get.return_value = cloud_node
+        # Simulate a successful but slow node destroy call: the cloud node
+        # list gets updated before the ShutdownActor finishes.
+        record = self.daemon.cloud_nodes.get().nodes.values()[0]
+        self.assertTrue(record.shutdown_actor is not None)
+        self.daemon.cloud_nodes.get().nodes.clear()
+        self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
+        self.assertTrue(
+            record.shutdown_actor is not None,
+            "test was ineffective -- failed to simulate the race condition")
+
     def test_booted_node_shut_down_when_never_listed(self):
         setup = self.start_node_boot()
         self.cloud_factory().node_start_time.return_value = time.time() - 3601
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.assertFalse(self.node_shutdown.start.called)
         now = time.time()
         self.monitor_list()[0].tell_proxy().consider_shutdown()
         self.busywait(lambda: self.node_shutdown.start.called)
-        self.stop_proxy(self.daemon)
         self.assertShutdownCancellable(False)
 
     def test_booted_node_shut_down_when_never_paired(self):
         cloud_node = testutil.cloud_node_mock(2)
         setup = self.start_node_boot(cloud_node)
         self.cloud_factory().node_start_time.return_value = time.time() - 3601
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.daemon.update_cloud_nodes([cloud_node])
         self.monitor_list()[0].tell_proxy().consider_shutdown()
         self.busywait(lambda: self.node_shutdown.start.called)
-        self.stop_proxy(self.daemon)
         self.assertShutdownCancellable(False)
 
     def test_booted_node_shut_down_when_never_working(self):
@@ -419,19 +496,18 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
         setup = self.start_node_boot(cloud_node, arv_node)
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
         self.daemon.update_cloud_nodes([cloud_node])
         self.busywait(lambda: self.node_shutdown.start.called)
-        self.stop_proxy(self.daemon)
         self.assertShutdownCancellable(False)
 
     def test_node_that_pairs_not_considered_failed_boot(self):
         cloud_node = testutil.cloud_node_mock(3)
         arv_node = testutil.arvados_node_mock(3)
         setup = self.start_node_boot(cloud_node, arv_node)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.daemon.update_cloud_nodes([cloud_node])
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
@@ -443,7 +519,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(5)
         arv_node = testutil.arvados_node_mock(5, job_uuid=True)
         setup = self.start_node_boot(cloud_node, arv_node)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.daemon.update_cloud_nodes([cloud_node])
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
@@ -454,8 +530,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_booting_nodes_shut_down(self):
         self.make_daemon(want_sizes=[testutil.MockSize(1)])
         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
+        self.busywait(lambda: self.last_setup.stop_if_no_cloud_node.called)
 
     def test_all_booting_nodes_tried_to_shut_down(self):
         size = testutil.MockSize(2)
@@ -480,7 +555,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_node = testutil.arvados_node_mock(1)
         size = testutil.MockSize(1)
         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
-        self.assertEqual(1, self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -490,7 +565,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(1)
         arv_node = testutil.arvados_node_mock(1)
         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
-        self.assertEqual(1, self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -498,11 +573,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
     def test_shutdown_accepted_below_capacity(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.alive_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_shutdown.start.called)
+        self.busywait(lambda: self.node_shutdown.start.called)
 
     def test_shutdown_declined_when_idle_and_job_queued(self):
         size = testutil.MockSize(1)
@@ -510,7 +584,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
                      testutil.arvados_node_mock(4, job_uuid=None)]
         self.make_daemon(cloud_nodes, arv_nodes, [size])
-        self.assertEqual(2, self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             monitor = mon_ref.proxy()
             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
@@ -529,13 +603,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = False
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.assertEqual(1, self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
 
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = True
         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.assertEqual(0, self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(0, self.paired_monitor_count()))
 
     def test_nodes_shutting_down_replaced_below_max_nodes(self):
         size = testutil.MockSize(6)
@@ -546,23 +620,37 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.assertTrue(self.node_shutdown.start.called)
+        getmock = mock.MagicMock()
+        getmock.get.return_value = False
+        self.last_shutdown.cancel_shutdown.return_value = getmock
         self.daemon.update_server_wishlist(
             [testutil.MockSize(6)]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.busywait(lambda: self.node_setup.start.called)
+
+    def test_nodes_shutting_down_cancelled(self):
+        size = testutil.MockSize(6)
+        cloud_node = testutil.cloud_node_mock(6, size=size)
+        self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
+                         avail_sizes=[(size, {"cores":1})])
+        self.assertEqual(1, self.alive_monitor_count())
+        monitor = self.monitor_list()[0].proxy()
+        self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.assertTrue(self.node_shutdown.start.called)
+        self.daemon.update_server_wishlist(
+            [testutil.MockSize(6)]).get(self.TIMEOUT)
+        self.busywait(lambda: self.last_shutdown.cancel_shutdown.called)
 
     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
         cloud_node = testutil.cloud_node_mock(7)
         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
                          max_nodes=1)
-        self.assertEqual(1, self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.assertTrue(self.node_shutdown.start.called)
         self.daemon.update_server_wishlist(
             [testutil.MockSize(7)]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called)
+        self.busywait(lambda: not self.node_setup.start.called)
 
     def test_nodes_shutting_down_count_against_excess(self):
         size = testutil.MockSize(8)
@@ -570,7 +658,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
         self.make_daemon(cloud_nodes, arv_nodes, [size],
                          avail_sizes=[(size, {"cores":1})])
-        self.assertEqual(2, self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
@@ -581,7 +669,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         new_node.stop_if_no_cloud_node.reset_mock()
         self.daemon.shutdown().get(self.TIMEOUT)
         self.assertTrue(new_node.stop_if_no_cloud_node.called)
-        self.daemon.node_up(new_node).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(new_node).get(self.TIMEOUT)
         self.assertTrue(new_node.stop.called)
         self.timer.deliver()
         self.assertTrue(
@@ -595,8 +683,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         size = testutil.MockSize(2)
         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
         self.timer.deliver()
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.node_setup.start.call_count)
+        self.busywait(lambda: 1 == self.node_setup.start.call_count)
 
     def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
@@ -604,9 +691,28 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(
-            1, self.last_shutdown.stop.call_count)
+        self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
+
+    def test_idle_node_disappearing_clears_status_idle_time_counter(self):
+        size = testutil.MockSize(1)
+        status.tracker._idle_nodes = {}
+        cloud_nodes = [testutil.cloud_node_mock(1, size=size)]
+        arv_nodes = [testutil.arvados_node_mock(1, job_uuid=None)]
+        self.make_daemon(cloud_nodes, arv_nodes, [size])
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
+        for mon_ref in self.monitor_list():
+            monitor = mon_ref.proxy()
+            if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
+                break
+        else:
+            self.fail("monitor for idle node not found")
+        self.assertEqual(1, status.tracker.get('nodes_idle'))
+        hostname = monitor.arvados_node.get()['hostname']
+        self.assertIn(hostname, status.tracker._idle_nodes)
+        # Simulate the node disappearing from the cloud node list
+        self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
+        self.busywait(lambda: 0 == self.alive_monitor_count())
+        self.assertNotIn(hostname, status.tracker._idle_nodes)
 
     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
@@ -617,8 +723,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         # the ActorDeadError.
         self.last_shutdown.stop.side_effect = pykka.ActorDeadError
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.last_shutdown.stop.call_count)
+        self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
 
     def test_node_create_two_sizes(self):
         small = testutil.MockSize(1)
@@ -645,7 +750,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         big = testutil.MockSize(2)
         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
                         (testutil.MockSize(2), {"cores":2})]
-        self.make_daemon(want_sizes=[small, small, small, big],
+        self.make_daemon(want_sizes=[small, small, big, small],
                          avail_sizes=avail_sizes, max_nodes=3)
 
         # the daemon runs in another thread, so we need to wait and see
@@ -659,6 +764,27 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertEqual(2, sizecounts[small.id])
         self.assertEqual(1, sizecounts[big.id])
 
+    def test_wishlist_ordering(self):
+        # Check that big nodes aren't prioritized; since #12199 containers are
+        # scheduled on specific node sizes.
+        small = testutil.MockSize(1)
+        big = testutil.MockSize(2)
+        avail_sizes = [(testutil.MockSize(1), {"cores":1}),
+                        (testutil.MockSize(2), {"cores":2})]
+        self.make_daemon(want_sizes=[small, small, small, big],
+                         avail_sizes=avail_sizes, max_nodes=3)
+
+        # the daemon runs in another thread, so we need to wait and see
+        # if it does all the work we're expecting it to do before stopping it.
+        self.busywait(lambda: self.node_setup.start.call_count == 3)
+        booting = self.daemon.booting.get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        sizecounts = {a[0].id: 0 for a in avail_sizes}
+        for b in booting.itervalues():
+            sizecounts[b.cloud_size.get().id] += 1
+        self.assertEqual(3, sizecounts[small.id])
+        self.assertEqual(0, sizecounts[big.id])
+
     def test_wishlist_reconfigure(self):
         small = testutil.MockSize(1)
         big = testutil.MockSize(2)
@@ -672,7 +798,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                         testutil.arvados_node_mock(3)],
                          want_sizes=[small, small, big],
                          avail_sizes=avail_sizes)
-
+        self.assertwait(lambda: self.assertEqual(3, self.paired_monitor_count()))
         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
 
         self.assertEqual(0, self.node_shutdown.start.call_count)
@@ -683,10 +809,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         booting = self.daemon.booting.get()
         cloud_nodes = self.daemon.cloud_nodes.get()
 
-        self.stop_proxy(self.daemon)
+        self.busywait(lambda: 1 == self.node_setup.start.call_count)
+        self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
 
-        self.assertEqual(1, self.node_setup.start.call_count)
-        self.assertEqual(1, self.node_shutdown.start.call_count)
+        self.stop_proxy(self.daemon)
 
         # booting a new big node
         sizecounts = {a[0].id: 0 for a in avail_sizes}