Merge branch 'master' into 13804-no-shutdown-wanted-nodes
[arvados.git] / services / nodemanager / tests / test_daemon.py
index 7da250be26bae00a0bc7a2c156a7514051b30aa5..1b6e4ca8da4aa24bfb45f8382e7b5d7700cd2bf2 100644 (file)
@@ -1,4 +1,7 @@
 #!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
 
 from __future__ import absolute_import, print_function
 
@@ -9,13 +12,38 @@ import mock
 import pykka
 
 import arvnodeman.daemon as nmdaemon
+import arvnodeman.status as status
 from arvnodeman.jobqueue import ServerCalculator
 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
 from . import testutil
+from . import test_status
+from . import pykka_timeout
 import logging
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                      unittest.TestCase):
+
+    def assertwait(self, f, timeout=pykka_timeout*2):
+        deadline = time.time() + timeout
+        while True:
+            try:
+                return f()
+            except AssertionError:
+                if time.time() > deadline:
+                    raise
+                pass
+            time.sleep(.1)
+            self.daemon.ping().get(self.TIMEOUT)
+
+    def busywait(self, f):
+        for n in xrange(200):
+            ok = f()
+            if ok:
+                return
+            time.sleep(.1)
+            self.daemon.ping().get(self.TIMEOUT)
+        self.assertTrue(ok) # always falsy, but not necessarily False
+
     def mock_node_start(self, **kwargs):
         # Make sure that every time the daemon starts a setup actor,
         # it gets a new mock object back.
@@ -48,18 +76,32 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         return mock_actor
 
     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
-                    avail_sizes=[(testutil.MockSize(1), {"cores": 1})],
+                    avail_sizes=None,
                     min_nodes=0, max_nodes=8,
                     shutdown_windows=[54, 5, 1],
                     max_total_price=None):
         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
+
+        if not avail_sizes:
+            if cloud_nodes or want_sizes:
+                avail_sizes=[(c.size, {"cores": int(c.id)}) for c in cloud_nodes] + [(s, {"cores": 1}) for s in want_sizes]
+            else:
+                avail_sizes=[(testutil.MockSize(1), {"cores": 1})]
+
         self.arv_factory = mock.MagicMock(name='arvados_mock')
+        api_client = mock.MagicMock(name='api_client')
+        api_client.nodes().create().execute.side_effect = \
+            [testutil.arvados_node_mock(1),
+             testutil.arvados_node_mock(2)]
+        self.arv_factory.return_value = api_client
+
         self.cloud_factory = mock.MagicMock(name='cloud_mock')
         self.cloud_factory().node_start_time.return_value = time.time()
         self.cloud_updates = mock.MagicMock(name='updates_mock')
         self.timer = testutil.MockTimer(deliver_immediately=False)
         self.cloud_factory().node_id.side_effect = lambda node: node.id
+        self.cloud_factory().broken.return_value = False
 
         self.node_setup = mock.MagicMock(name='setup_mock')
         self.node_setup.start.side_effect = self.mock_node_start
@@ -76,22 +118,24 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
             min_nodes, max_nodes, 600, 1800, 3600,
             self.node_setup, self.node_shutdown,
             max_total_price=max_total_price).proxy()
-        if cloud_nodes is not None:
-            self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
         if arvados_nodes is not None:
             self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
+        if cloud_nodes is not None:
+            self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
         if want_sizes is not None:
             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
 
     def monitor_list(self):
-        return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
+        return [c.actor.actor_ref for c in self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values() if c.actor]
 
-    def monitored_arvados_nodes(self):
+    def monitored_arvados_nodes(self, include_unpaired=True):
         pairings = []
         for future in [actor.proxy().arvados_node
                        for actor in self.monitor_list()]:
             try:
-                pairings.append(future.get(self.TIMEOUT))
+                g = future.get(self.TIMEOUT)
+                if g or include_unpaired:
+                    pairings.append(g)
             except pykka.ActorDeadError:
                 pass
         return pairings
@@ -99,6 +143,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def alive_monitor_count(self):
         return len(self.monitored_arvados_nodes())
 
+    def paired_monitor_count(self):
+        return len(self.monitored_arvados_nodes(False))
+
     def assertShutdownCancellable(self, expected=True):
         self.assertTrue(self.node_shutdown.start.called)
         self.assertIs(expected,
@@ -108,17 +155,16 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_easy_node_creation(self):
         size = testutil.MockSize(1)
         self.make_daemon(want_sizes=[size])
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.busywait(lambda: self.node_setup.start.called)
+        self.assertIn('node_quota', status.tracker._latest)
 
     def check_monitors_arvados_nodes(self, *arv_nodes):
-        self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
+        self.assertwait(lambda: self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes()))
 
     def test_node_pairing(self):
         cloud_node = testutil.cloud_node_mock(1)
         arv_node = testutil.arvados_node_mock(1)
         self.make_daemon([cloud_node], [arv_node])
-        self.stop_proxy(self.daemon)
         self.check_monitors_arvados_nodes(arv_node)
 
     def test_node_pairing_after_arvados_update(self):
@@ -127,26 +173,26 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          [testutil.arvados_node_mock(1, ip_address=None)])
         arv_node = testutil.arvados_node_mock(2)
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
         self.check_monitors_arvados_nodes(arv_node)
 
     def test_arvados_node_un_and_re_paired(self):
         # We need to create the Arvados node mock after spinning up the daemon
         # to make sure it's new enough to pair with the cloud node.
-        self.make_daemon([testutil.cloud_node_mock(3)], arvados_nodes=None)
+        self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
+                         arvados_nodes=None)
         arv_node = testutil.arvados_node_mock(3)
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
         self.check_monitors_arvados_nodes(arv_node)
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.assertEqual(0, self.alive_monitor_count())
+        self.busywait(lambda: 0 == self.alive_monitor_count())
         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
-        self.stop_proxy(self.daemon)
         self.check_monitors_arvados_nodes(arv_node)
 
     def test_old_arvados_node_not_double_assigned(self):
         arv_node = testutil.arvados_node_mock(3, age=9000)
         size = testutil.MockSize(3)
-        self.make_daemon(arvados_nodes=[arv_node], avail_sizes=[(size, {"cores":1})])
+        self.make_daemon(arvados_nodes=[arv_node],
+                         avail_sizes=[(size, {"cores":1})])
         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -157,65 +203,97 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertIn(None, used_nodes)
 
     def test_node_count_satisfied(self):
-        self.make_daemon([testutil.cloud_node_mock()],
+        self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
                          want_sizes=[testutil.MockSize(1)])
-        self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called)
+        self.busywait(lambda: not self.node_setup.start.called)
+
+    def test_select_stale_node_records_with_slot_numbers_first(self):
+        """
+        Stale node records with slot_number assigned can exist when
+        clean_arvados_node() isn't executed after a node shutdown, for
+        various reasons.
+        NodeManagerDaemonActor should use these stale node records first, so
+        that they don't accumulate unused, reducing the slots available.
+        """
+        size = testutil.MockSize(1)
+        a_long_time_ago = '1970-01-01T01:02:03.04050607Z'
+        arvados_nodes = []
+        for n in range(9):
+            # Add several stale node records without slot_number assigned
+            arvados_nodes.append(
+                testutil.arvados_node_mock(
+                    n+1,
+                    slot_number=None,
+                    modified_at=a_long_time_ago))
+        # Add one record with stale_node assigned, it should be the
+        # first one selected
+        arv_node = testutil.arvados_node_mock(
+            123,
+            modified_at=a_long_time_ago)
+        arvados_nodes.append(arv_node)
+        cloud_node = testutil.cloud_node_mock(125, size=size)
+        self.make_daemon(cloud_nodes=[cloud_node],
+                         arvados_nodes=arvados_nodes)
+        arvados_nodes_tracker = self.daemon.arvados_nodes.get()
+        # Here, find_stale_node() should return the node record with
+        # the slot_number assigned.
+        self.assertEqual(arv_node,
+                         arvados_nodes_tracker.find_stale_node(3601))
 
     def test_dont_count_missing_as_busy(self):
         size = testutil.MockSize(1)
-        self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1),
-                                      testutil.cloud_node_mock(2)],
+        self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
+                                      testutil.cloud_node_mock(2, size=size)],
                          arvados_nodes=[testutil.arvados_node_mock(1),
-                                      testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
+                                        testutil.arvados_node_mock(
+                                            2,
+                                            last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size, size])
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.busywait(lambda: 2 == self.alive_monitor_count())
+        self.busywait(lambda: self.node_setup.start.called)
 
     def test_missing_counts_towards_max(self):
         size = testutil.MockSize(1)
-        self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1),
-                                      testutil.cloud_node_mock(2)],
+        self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
+                                      testutil.cloud_node_mock(2, size=size)],
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size, size],
                          max_nodes=2)
-        self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called)
+        self.busywait(lambda: not self.node_setup.start.called)
 
     def test_excess_counts_missing(self):
         size = testutil.MockSize(1)
-        cloud_nodes = [testutil.cloud_node_mock(1), testutil.cloud_node_mock(2)]
+        cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
         self.make_daemon(cloud_nodes=cloud_nodes,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-        self.assertEqual(2, self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
 
     def test_missing_shutdown_not_excess(self):
         size = testutil.MockSize(1)
-        cloud_nodes = [testutil.cloud_node_mock(1), testutil.cloud_node_mock(2)]
+        cloud_nodes = [testutil.cloud_node_mock(1, size=size), testutil.cloud_node_mock(2, size=size)]
         self.make_daemon(cloud_nodes=cloud_nodes,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         get_cloud_node = mock.MagicMock(name="get_cloud_node")
         get_cloud_node.get.return_value = cloud_nodes[1]
         mock_node_monitor = mock.MagicMock()
         mock_node_monitor.proxy.return_value = mock.NonCallableMock(cloud_node=get_cloud_node)
         mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
 
-        self.daemon.shutdowns.get()[cloud_nodes[1].id] = mock_shutdown.proxy()
-        self.daemon.sizes_booting_shutdown.get()[cloud_nodes[1].id] = size
+        self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
 
-        self.assertEqual(2, self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.alive_monitor_count()))
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
-        self.assertEqual(1, self.node_shutdown.start.call_count)
+        self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
 
     def test_booting_nodes_counted(self):
         cloud_node = testutil.cloud_node_mock(1)
@@ -225,16 +303,15 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.max_nodes.get(self.TIMEOUT)
         self.assertTrue(self.node_setup.start.called)
         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.node_setup.start.call_count)
+        self.busywait(lambda: 1 == self.node_setup.start.call_count)
 
     def test_boot_new_node_when_all_nodes_busy(self):
         size = testutil.MockSize(2)
         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
                          [size], avail_sizes=[(size, {"cores":1})])
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
+        self.assertwait(lambda: self.assertEqual(1, self.node_setup.start.called))
 
     def test_boot_new_node_below_min_nodes(self):
         min_size = testutil.MockSize(1)
@@ -251,7 +328,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                           for call in self.node_setup.start.call_args_list])
 
     def test_no_new_node_when_ge_min_nodes_busy(self):
-        cloud_nodes = [testutil.cloud_node_mock(n) for n in range(1, 4)]
+        size = testutil.MockSize(2)
+        cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in range(1, 4)]
         arv_nodes = [testutil.arvados_node_mock(n, job_uuid=True)
                      for n in range(1, 4)]
         self.make_daemon(cloud_nodes, arv_nodes, [], min_nodes=2)
@@ -259,9 +337,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertEqual(0, self.node_setup.start.call_count)
 
     def test_no_new_node_when_max_nodes_busy(self):
-        self.make_daemon([testutil.cloud_node_mock(3)],
-                         [testutil.arvados_node_mock(3, job_uuid=True)],
-                         [testutil.MockSize(3)],
+        size = testutil.MockSize(3)
+        self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(3)],
+                         arvados_nodes=[testutil.arvados_node_mock(3, job_uuid=True)],
+                         want_sizes=[size],
                          max_nodes=1)
         self.stop_proxy(self.daemon)
         self.assertFalse(self.node_setup.start.called)
@@ -269,6 +348,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def start_node_boot(self, cloud_node=None, arv_node=None, id_num=1):
         if cloud_node is None:
             cloud_node = testutil.cloud_node_mock(id_num)
+        id_num = int(cloud_node.id)
         if arv_node is None:
             arv_node = testutil.arvados_node_mock(id_num)
         self.make_daemon(want_sizes=[testutil.MockSize(id_num)],
@@ -283,12 +363,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(4)
         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
         setup = self.start_node_boot(cloud_node, arv_node)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
-        self.daemon.update_cloud_nodes([cloud_node])
         self.daemon.update_arvados_nodes([arv_node])
+        self.daemon.update_cloud_nodes([cloud_node])
+        self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-1801
         self.daemon.update_server_wishlist(
-            [testutil.MockSize(1)]).get(self.TIMEOUT)
+            [testutil.MockSize(4)]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
         self.assertEqual(2, self.node_setup.start.call_count)
 
@@ -299,13 +380,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(1)
         setup = self.start_node_boot(cloud_node)
         self.daemon.update_cloud_nodes([cloud_node])
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
 
     def test_no_duplication_when_booted_node_listed(self):
         cloud_node = testutil.cloud_node_mock(2)
         setup = self.start_node_boot(cloud_node, id_num=2)
-        self.daemon.node_up(setup)
+        self.daemon.node_setup_finished(setup)
         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
 
@@ -314,14 +395,14 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         # even it doesn't appear in the listing (e.g., because of delays
         # propagating tags).
         setup = self.start_node_boot()
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
 
     def test_booted_unlisted_node_counted(self):
         setup = self.start_node_boot(id_num=1)
-        self.daemon.node_up(setup)
+        self.daemon.node_setup_finished(setup)
         self.daemon.update_server_wishlist(
             [testutil.MockSize(1)]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -329,19 +410,25 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
     def test_booted_node_can_shutdown(self):
         setup = self.start_node_boot()
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.update_server_wishlist([])
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
         self.assertTrue(self.node_shutdown.start.called,
                         "daemon did not shut down booted node on offer")
 
+        with test_status.TestServer() as srv:
+            self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
+            self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
+            self.assertEqual(0, srv.get_status().get('nodes_wish', None))
+
     def test_booted_node_lifecycle(self):
         cloud_node = testutil.cloud_node_mock(6)
         setup = self.start_node_boot(cloud_node, id_num=6)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.update_server_wishlist([])
@@ -350,6 +437,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         shutdown = self.node_shutdown.start().proxy()
         shutdown.cloud_node.get.return_value = cloud_node
         self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
+        self.daemon.update_cloud_nodes([])
         self.assertTrue(shutdown.stop.called,
                         "shutdown actor not stopped after finishing")
         self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
@@ -360,42 +448,66 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertTrue(self.node_setup.start.called,
                         "second node not started after booted node stopped")
 
+    def test_node_disappearing_during_shutdown(self):
+        cloud_node = testutil.cloud_node_mock(6)
+        setup = self.start_node_boot(cloud_node, id_num=6)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
+        self.assertEqual(1, self.alive_monitor_count())
+        monitor = self.monitor_list()[0].proxy()
+        self.daemon.update_server_wishlist([])
+        self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.assertShutdownCancellable(True)
+        shutdown = self.node_shutdown.start().proxy()
+        shutdown.cloud_node.get.return_value = cloud_node
+        # Simulate a successful but slow node destroy call: the cloud node
+        # list gets updated before the ShutdownActor finishes.
+        record = self.daemon.cloud_nodes.get().nodes.values()[0]
+        self.assertTrue(record.shutdown_actor is not None)
+        self.daemon.cloud_nodes.get().nodes.clear()
+        self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
+        self.assertTrue(
+            record.shutdown_actor is not None,
+            "test was ineffective -- failed to simulate the race condition")
+
     def test_booted_node_shut_down_when_never_listed(self):
         setup = self.start_node_boot()
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.cloud_factory().node_start_time.return_value = time.time() - 3601
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.assertFalse(self.node_shutdown.start.called)
-        self.timer.deliver()
-        self.stop_proxy(self.daemon)
+        now = time.time()
+        self.monitor_list()[0].tell_proxy().consider_shutdown()
+        self.busywait(lambda: self.node_shutdown.start.called)
         self.assertShutdownCancellable(False)
 
     def test_booted_node_shut_down_when_never_paired(self):
         cloud_node = testutil.cloud_node_mock(2)
         setup = self.start_node_boot(cloud_node)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.cloud_factory().node_start_time.return_value = time.time() - 3601
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.daemon.update_cloud_nodes([cloud_node])
-        self.timer.deliver()
-        self.stop_proxy(self.daemon)
+        self.monitor_list()[0].tell_proxy().consider_shutdown()
+        self.busywait(lambda: self.node_shutdown.start.called)
         self.assertShutdownCancellable(False)
 
     def test_booted_node_shut_down_when_never_working(self):
         cloud_node = testutil.cloud_node_mock(4)
         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
         setup = self.start_node_boot(cloud_node, arv_node)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
+        self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
         self.daemon.update_cloud_nodes([cloud_node])
-        self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
-        self.timer.deliver()
-        self.stop_proxy(self.daemon)
+        self.busywait(lambda: self.node_shutdown.start.called)
         self.assertShutdownCancellable(False)
 
     def test_node_that_pairs_not_considered_failed_boot(self):
         cloud_node = testutil.cloud_node_mock(3)
         arv_node = testutil.arvados_node_mock(3)
         setup = self.start_node_boot(cloud_node, arv_node)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.daemon.update_cloud_nodes([cloud_node])
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
@@ -407,7 +519,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(5)
         arv_node = testutil.arvados_node_mock(5, job_uuid=True)
         setup = self.start_node_boot(cloud_node, arv_node)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.daemon.update_cloud_nodes([cloud_node])
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
@@ -418,8 +530,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_booting_nodes_shut_down(self):
         self.make_daemon(want_sizes=[testutil.MockSize(1)])
         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
+        self.busywait(lambda: self.last_setup.stop_if_no_cloud_node.called)
 
     def test_all_booting_nodes_tried_to_shut_down(self):
         size = testutil.MockSize(2)
@@ -441,9 +552,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
     def test_shutdown_declined_at_wishlist_capacity(self):
         cloud_node = testutil.cloud_node_mock(1)
+        arv_node = testutil.arvados_node_mock(1)
         size = testutil.MockSize(1)
-        self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
-        self.assertEqual(1, self.alive_monitor_count())
+        self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -451,8 +563,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
     def test_shutdown_declined_below_min_nodes(self):
         cloud_node = testutil.cloud_node_mock(1)
-        self.make_daemon(cloud_nodes=[cloud_node], min_nodes=1)
-        self.assertEqual(1, self.alive_monitor_count())
+        arv_node = testutil.arvados_node_mock(1)
+        self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -460,18 +573,18 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
     def test_shutdown_accepted_below_capacity(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.alive_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_shutdown.start.called)
+        self.busywait(lambda: self.node_shutdown.start.called)
 
     def test_shutdown_declined_when_idle_and_job_queued(self):
-        cloud_nodes = [testutil.cloud_node_mock(n) for n in [3, 4]]
+        size = testutil.MockSize(1)
+        cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [3, 4]]
         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
                      testutil.arvados_node_mock(4, job_uuid=None)]
-        self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(1)])
-        self.assertEqual(2, self.alive_monitor_count())
+        self.make_daemon(cloud_nodes, arv_nodes, [size])
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             monitor = mon_ref.proxy()
             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
@@ -490,36 +603,31 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = False
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.assertEqual(1, self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
 
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = True
         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.assertEqual(0, self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(0, self.paired_monitor_count()))
 
-    def test_broken_node_blackholed_after_cancelled_shutdown(self):
-        size = testutil.MockSize(8)
-        cloud_node = testutil.cloud_node_mock(8, size=size)
-        wishlist = [size]
-        self.make_daemon([cloud_node], [testutil.arvados_node_mock(8)],
-                         wishlist, avail_sizes=[(size, {"cores":1})])
+    def test_nodes_shutting_down_replaced_below_max_nodes(self):
+        size = testutil.MockSize(6)
+        cloud_node = testutil.cloud_node_mock(6, size=size)
+        self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
+                         avail_sizes=[(size, {"cores":1})])
         self.assertEqual(1, self.alive_monitor_count())
-        self.assertFalse(self.node_setup.start.called)
         monitor = self.monitor_list()[0].proxy()
-        shutdown_proxy = self.node_shutdown.start().proxy
-        shutdown_proxy().cloud_node.get.return_value = cloud_node
-        shutdown_proxy().success.get.return_value = False
-        shutdown_proxy().cancel_reason.get.return_value = self.node_shutdown.NODE_BROKEN
-        self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
-        self.daemon.node_finished_shutdown(shutdown_proxy()).get(self.TIMEOUT)
-        self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
-        self.daemon.update_server_wishlist(wishlist).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.node_setup.start.call_count)
+        self.assertTrue(self.node_shutdown.start.called)
+        getmock = mock.MagicMock()
+        getmock.get.return_value = False
+        self.last_shutdown.cancel_shutdown.return_value = getmock
+        self.daemon.update_server_wishlist(
+            [testutil.MockSize(6)]).get(self.TIMEOUT)
+        self.busywait(lambda: self.node_setup.start.called)
 
-    def test_nodes_shutting_down_replaced_below_max_nodes(self):
+    def test_nodes_shutting_down_cancelled(self):
         size = testutil.MockSize(6)
         cloud_node = testutil.cloud_node_mock(6, size=size)
         self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
@@ -530,21 +638,19 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertTrue(self.node_shutdown.start.called)
         self.daemon.update_server_wishlist(
             [testutil.MockSize(6)]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.busywait(lambda: self.last_shutdown.cancel_shutdown.called)
 
     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
         cloud_node = testutil.cloud_node_mock(7)
         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
                          max_nodes=1)
-        self.assertEqual(1, self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.assertTrue(self.node_shutdown.start.called)
         self.daemon.update_server_wishlist(
             [testutil.MockSize(7)]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called)
+        self.busywait(lambda: not self.node_setup.start.called)
 
     def test_nodes_shutting_down_count_against_excess(self):
         size = testutil.MockSize(8)
@@ -552,7 +658,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
         self.make_daemon(cloud_nodes, arv_nodes, [size],
                          avail_sizes=[(size, {"cores":1})])
-        self.assertEqual(2, self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
@@ -563,7 +669,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         new_node.stop_if_no_cloud_node.reset_mock()
         self.daemon.shutdown().get(self.TIMEOUT)
         self.assertTrue(new_node.stop_if_no_cloud_node.called)
-        self.daemon.node_up(new_node).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(new_node).get(self.TIMEOUT)
         self.assertTrue(new_node.stop.called)
         self.timer.deliver()
         self.assertTrue(
@@ -577,8 +683,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         size = testutil.MockSize(2)
         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
         self.timer.deliver()
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.node_setup.start.call_count)
+        self.busywait(lambda: 1 == self.node_setup.start.call_count)
 
     def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
@@ -586,9 +691,28 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(
-            1, self.last_shutdown.stop.call_count)
+        self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
+
+    def test_idle_node_disappearing_clears_status_idle_time_counter(self):
+        size = testutil.MockSize(1)
+        status.tracker._idle_nodes = {}
+        cloud_nodes = [testutil.cloud_node_mock(1, size=size)]
+        arv_nodes = [testutil.arvados_node_mock(1, job_uuid=None)]
+        self.make_daemon(cloud_nodes, arv_nodes, [size])
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
+        for mon_ref in self.monitor_list():
+            monitor = mon_ref.proxy()
+            if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
+                break
+        else:
+            self.fail("monitor for idle node not found")
+        self.assertEqual(1, status.tracker.get('nodes_idle'))
+        hostname = monitor.arvados_node.get()['hostname']
+        self.assertIn(hostname, status.tracker._idle_nodes)
+        # Simulate the node disappearing from the cloud node list
+        self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
+        self.busywait(lambda: 0 == self.alive_monitor_count())
+        self.assertNotIn(hostname, status.tracker._idle_nodes)
 
     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
@@ -599,15 +723,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         # the ActorDeadError.
         self.last_shutdown.stop.side_effect = pykka.ActorDeadError
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.last_shutdown.stop.call_count)
-
-    def busywait(self, f):
-        n = 0
-        while not f() and n < 10:
-            time.sleep(.1)
-            n += 1
-        self.assertTrue(f())
+        self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
 
     def test_node_create_two_sizes(self):
         small = testutil.MockSize(1)
@@ -634,7 +750,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         big = testutil.MockSize(2)
         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
                         (testutil.MockSize(2), {"cores":2})]
-        self.make_daemon(want_sizes=[small, small, small, big],
+        self.make_daemon(want_sizes=[small, small, big, small],
                          avail_sizes=avail_sizes, max_nodes=3)
 
         # the daemon runs in another thread, so we need to wait and see
@@ -648,6 +764,27 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertEqual(2, sizecounts[small.id])
         self.assertEqual(1, sizecounts[big.id])
 
+    def test_wishlist_ordering(self):
+        # Check that big nodes aren't prioritized; since #12199 containers are
+        # scheduled on specific node sizes.
+        small = testutil.MockSize(1)
+        big = testutil.MockSize(2)
+        avail_sizes = [(testutil.MockSize(1), {"cores":1}),
+                        (testutil.MockSize(2), {"cores":2})]
+        self.make_daemon(want_sizes=[small, small, small, big],
+                         avail_sizes=avail_sizes, max_nodes=3)
+
+        # the daemon runs in another thread, so we need to wait and see
+        # if it does all the work we're expecting it to do before stopping it.
+        self.busywait(lambda: self.node_setup.start.call_count == 3)
+        booting = self.daemon.booting.get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        sizecounts = {a[0].id: 0 for a in avail_sizes}
+        for b in booting.itervalues():
+            sizecounts[b.cloud_size.get().id] += 1
+        self.assertEqual(3, sizecounts[small.id])
+        self.assertEqual(0, sizecounts[big.id])
+
     def test_wishlist_reconfigure(self):
         small = testutil.MockSize(1)
         big = testutil.MockSize(2)
@@ -661,7 +798,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                         testutil.arvados_node_mock(3)],
                          want_sizes=[small, small, big],
                          avail_sizes=avail_sizes)
-
+        self.assertwait(lambda: self.assertEqual(3, self.paired_monitor_count()))
         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
 
         self.assertEqual(0, self.node_shutdown.start.call_count)
@@ -670,12 +807,12 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
             self.daemon.node_can_shutdown(c.actor)
 
         booting = self.daemon.booting.get()
-        shutdowns = self.daemon.shutdowns.get()
+        cloud_nodes = self.daemon.cloud_nodes.get()
 
-        self.stop_proxy(self.daemon)
+        self.busywait(lambda: 1 == self.node_setup.start.call_count)
+        self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
 
-        self.assertEqual(1, self.node_setup.start.call_count)
-        self.assertEqual(1, self.node_shutdown.start.call_count)
+        self.stop_proxy(self.daemon)
 
         # booting a new big node
         sizecounts = {a[0].id: 0 for a in avail_sizes}
@@ -686,8 +823,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
         # shutting down a small node
         sizecounts = {a[0].id: 0 for a in avail_sizes}
-        for b in shutdowns.itervalues():
-            sizecounts[b.cloud_node.get().size.id] += 1
+        for b in cloud_nodes.nodes.itervalues():
+            if b.shutdown_actor is not None:
+                sizecounts[b.cloud_node.size.id] += 1
         self.assertEqual(1, sizecounts[small.id])
         self.assertEqual(0, sizecounts[big.id])