Merge branch 'master' into 13804-no-shutdown-wanted-nodes
[arvados.git] / services / nodemanager / tests / test_daemon.py
index 6012cfc6b72a9804d1a5a22dc727b5e11389a3ce..1b6e4ca8da4aa24bfb45f8382e7b5d7700cd2bf2 100644 (file)
@@ -17,18 +17,32 @@ from arvnodeman.jobqueue import ServerCalculator
 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
 from . import testutil
 from . import test_status
+from . import pykka_timeout
 import logging
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                      unittest.TestCase):
 
+    def assertwait(self, f, timeout=pykka_timeout*2):
+        deadline = time.time() + timeout
+        while True:
+            try:
+                return f()
+            except AssertionError:
+                if time.time() > deadline:
+                    raise
+                pass
+            time.sleep(.1)
+            self.daemon.ping().get(self.TIMEOUT)
+
     def busywait(self, f):
-        n = 0
-        while not f() and n < 200:
+        for n in xrange(200):
+            ok = f()
+            if ok:
+                return
             time.sleep(.1)
             self.daemon.ping().get(self.TIMEOUT)
-            n += 1
-        self.assertTrue(f())
+        self.assertTrue(ok) # always falsy, but not necessarily False
 
     def mock_node_start(self, **kwargs):
         # Make sure that every time the daemon starts a setup actor,
@@ -77,8 +91,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
         self.arv_factory = mock.MagicMock(name='arvados_mock')
         api_client = mock.MagicMock(name='api_client')
-        api_client.nodes().create().execute.side_effect = [testutil.arvados_node_mock(1),
-                                                           testutil.arvados_node_mock(2)]
+        api_client.nodes().create().execute.side_effect = \
+            [testutil.arvados_node_mock(1),
+             testutil.arvados_node_mock(2)]
         self.arv_factory.return_value = api_client
 
         self.cloud_factory = mock.MagicMock(name='cloud_mock')
@@ -141,10 +156,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         size = testutil.MockSize(1)
         self.make_daemon(want_sizes=[size])
         self.busywait(lambda: self.node_setup.start.called)
+        self.assertIn('node_quota', status.tracker._latest)
 
     def check_monitors_arvados_nodes(self, *arv_nodes):
-        self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
-        self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
+        self.assertwait(lambda: self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes()))
 
     def test_node_pairing(self):
         cloud_node = testutil.cloud_node_mock(1)
@@ -192,6 +207,39 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          want_sizes=[testutil.MockSize(1)])
         self.busywait(lambda: not self.node_setup.start.called)
 
+    def test_select_stale_node_records_with_slot_numbers_first(self):
+        """
+        Stale node records with slot_number assigned can exist when
+        clean_arvados_node() isn't executed after a node shutdown, for
+        various reasons.
+        NodeManagerDaemonActor should use these stale node records first, so
+        that they don't accumulate unused, reducing the slots available.
+        """
+        size = testutil.MockSize(1)
+        a_long_time_ago = '1970-01-01T01:02:03.04050607Z'
+        arvados_nodes = []
+        for n in range(9):
+            # Add several stale node records without slot_number assigned
+            arvados_nodes.append(
+                testutil.arvados_node_mock(
+                    n+1,
+                    slot_number=None,
+                    modified_at=a_long_time_ago))
+        # Add one record with stale_node assigned, it should be the
+        # first one selected
+        arv_node = testutil.arvados_node_mock(
+            123,
+            modified_at=a_long_time_ago)
+        arvados_nodes.append(arv_node)
+        cloud_node = testutil.cloud_node_mock(125, size=size)
+        self.make_daemon(cloud_nodes=[cloud_node],
+                         arvados_nodes=arvados_nodes)
+        arvados_nodes_tracker = self.daemon.arvados_nodes.get()
+        # Here, find_stale_node() should return the node record with
+        # the slot_number assigned.
+        self.assertEqual(arv_node,
+                         arvados_nodes_tracker.find_stale_node(3601))
+
     def test_dont_count_missing_as_busy(self):
         size = testutil.MockSize(1)
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
@@ -221,7 +269,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-        self.busywait(lambda: 2 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
@@ -233,7 +281,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         get_cloud_node = mock.MagicMock(name="get_cloud_node")
         get_cloud_node.get.return_value = cloud_nodes[1]
         mock_node_monitor = mock.MagicMock()
@@ -242,7 +290,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
 
-        self.busywait(lambda: 2 == self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.alive_monitor_count()))
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
@@ -262,8 +310,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
                          [size], avail_sizes=[(size, {"cores":1})])
-        self.busywait(lambda: 1 == self.paired_monitor_count())
-        self.busywait(lambda: self.node_setup.start.called)
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
+        self.assertwait(lambda: self.assertEqual(1, self.node_setup.start.called))
 
     def test_boot_new_node_below_min_nodes(self):
         min_size = testutil.MockSize(1)
@@ -400,6 +448,27 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertTrue(self.node_setup.start.called,
                         "second node not started after booted node stopped")
 
+    def test_node_disappearing_during_shutdown(self):
+        cloud_node = testutil.cloud_node_mock(6)
+        setup = self.start_node_boot(cloud_node, id_num=6)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
+        self.assertEqual(1, self.alive_monitor_count())
+        monitor = self.monitor_list()[0].proxy()
+        self.daemon.update_server_wishlist([])
+        self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.assertShutdownCancellable(True)
+        shutdown = self.node_shutdown.start().proxy()
+        shutdown.cloud_node.get.return_value = cloud_node
+        # Simulate a successful but slow node destroy call: the cloud node
+        # list gets updated before the ShutdownActor finishes.
+        record = self.daemon.cloud_nodes.get().nodes.values()[0]
+        self.assertTrue(record.shutdown_actor is not None)
+        self.daemon.cloud_nodes.get().nodes.clear()
+        self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
+        self.assertTrue(
+            record.shutdown_actor is not None,
+            "test was ineffective -- failed to simulate the race condition")
+
     def test_booted_node_shut_down_when_never_listed(self):
         setup = self.start_node_boot()
         self.cloud_factory().node_start_time.return_value = time.time() - 3601
@@ -486,7 +555,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_node = testutil.arvados_node_mock(1)
         size = testutil.MockSize(1)
         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
-        self.busywait(lambda: 1 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -496,7 +565,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(1)
         arv_node = testutil.arvados_node_mock(1)
         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
-        self.busywait(lambda: 1 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -515,7 +584,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
                      testutil.arvados_node_mock(4, job_uuid=None)]
         self.make_daemon(cloud_nodes, arv_nodes, [size])
-        self.busywait(lambda: 2 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             monitor = mon_ref.proxy()
             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
@@ -534,13 +603,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = False
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.busywait(lambda: 1 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
 
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = True
         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.busywait(lambda: 0 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(0, self.paired_monitor_count()))
 
     def test_nodes_shutting_down_replaced_below_max_nodes(self):
         size = testutil.MockSize(6)
@@ -551,15 +620,31 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.assertTrue(self.node_shutdown.start.called)
+        getmock = mock.MagicMock()
+        getmock.get.return_value = False
+        self.last_shutdown.cancel_shutdown.return_value = getmock
         self.daemon.update_server_wishlist(
             [testutil.MockSize(6)]).get(self.TIMEOUT)
         self.busywait(lambda: self.node_setup.start.called)
 
+    def test_nodes_shutting_down_cancelled(self):
+        size = testutil.MockSize(6)
+        cloud_node = testutil.cloud_node_mock(6, size=size)
+        self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
+                         avail_sizes=[(size, {"cores":1})])
+        self.assertEqual(1, self.alive_monitor_count())
+        monitor = self.monitor_list()[0].proxy()
+        self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.assertTrue(self.node_shutdown.start.called)
+        self.daemon.update_server_wishlist(
+            [testutil.MockSize(6)]).get(self.TIMEOUT)
+        self.busywait(lambda: self.last_shutdown.cancel_shutdown.called)
+
     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
         cloud_node = testutil.cloud_node_mock(7)
         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
                          max_nodes=1)
-        self.busywait(lambda: 1 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.assertTrue(self.node_shutdown.start.called)
@@ -573,7 +658,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
         self.make_daemon(cloud_nodes, arv_nodes, [size],
                          avail_sizes=[(size, {"cores":1})])
-        self.busywait(lambda: 2 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
@@ -608,6 +693,27 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
 
+    def test_idle_node_disappearing_clears_status_idle_time_counter(self):
+        size = testutil.MockSize(1)
+        status.tracker._idle_nodes = {}
+        cloud_nodes = [testutil.cloud_node_mock(1, size=size)]
+        arv_nodes = [testutil.arvados_node_mock(1, job_uuid=None)]
+        self.make_daemon(cloud_nodes, arv_nodes, [size])
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
+        for mon_ref in self.monitor_list():
+            monitor = mon_ref.proxy()
+            if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
+                break
+        else:
+            self.fail("monitor for idle node not found")
+        self.assertEqual(1, status.tracker.get('nodes_idle'))
+        hostname = monitor.arvados_node.get()['hostname']
+        self.assertIn(hostname, status.tracker._idle_nodes)
+        # Simulate the node disappearing from the cloud node list
+        self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
+        self.busywait(lambda: 0 == self.alive_monitor_count())
+        self.assertNotIn(hostname, status.tracker._idle_nodes)
+
     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
         self.assertEqual(1, self.alive_monitor_count())
@@ -644,7 +750,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         big = testutil.MockSize(2)
         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
                         (testutil.MockSize(2), {"cores":2})]
-        self.make_daemon(want_sizes=[small, small, small, big],
+        self.make_daemon(want_sizes=[small, small, big, small],
                          avail_sizes=avail_sizes, max_nodes=3)
 
         # the daemon runs in another thread, so we need to wait and see
@@ -658,6 +764,27 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertEqual(2, sizecounts[small.id])
         self.assertEqual(1, sizecounts[big.id])
 
+    def test_wishlist_ordering(self):
+        # Check that big nodes aren't prioritized; since #12199 containers are
+        # scheduled on specific node sizes.
+        small = testutil.MockSize(1)
+        big = testutil.MockSize(2)
+        avail_sizes = [(testutil.MockSize(1), {"cores":1}),
+                        (testutil.MockSize(2), {"cores":2})]
+        self.make_daemon(want_sizes=[small, small, small, big],
+                         avail_sizes=avail_sizes, max_nodes=3)
+
+        # the daemon runs in another thread, so we need to wait and see
+        # if it does all the work we're expecting it to do before stopping it.
+        self.busywait(lambda: self.node_setup.start.call_count == 3)
+        booting = self.daemon.booting.get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        sizecounts = {a[0].id: 0 for a in avail_sizes}
+        for b in booting.itervalues():
+            sizecounts[b.cloud_size.get().id] += 1
+        self.assertEqual(3, sizecounts[small.id])
+        self.assertEqual(0, sizecounts[big.id])
+
     def test_wishlist_reconfigure(self):
         small = testutil.MockSize(1)
         big = testutil.MockSize(2)
@@ -671,7 +798,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                         testutil.arvados_node_mock(3)],
                          want_sizes=[small, small, big],
                          avail_sizes=avail_sizes)
-
+        self.assertwait(lambda: self.assertEqual(3, self.paired_monitor_count()))
         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
 
         self.assertEqual(0, self.node_shutdown.start.call_count)