9161: Adjusting behavior to accomodate down/broken/missing nodes.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 13 May 2016 18:26:30 +0000 (14:26 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 13 May 2016 18:26:30 +0000 (14:26 -0400)
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/tests/test_computenode_dispatch.py
services/nodemanager/tests/test_daemon.py

index b43e8a38ba0fd1cf2f07d25d20f2fc61b7202994..8674f168f7aff6dd5840d5745cf2a2490e18ea49 100644 (file)
@@ -88,7 +88,7 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     Manager to handle).
     """
     def __init__(self, timer_actor, arvados_client, cloud_client,
-                 cloud_size, arvados_node,
+                 cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
             cloud_client, arvados_client, timer_actor,
@@ -96,7 +96,16 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
-        self._later.prepare_arvados_node(arvados_node)
+        if arvados_node is None:
+            self._later.create_arvados_node()
+        else:
+            self._later.prepare_arvados_node(arvados_node)
+
+    @ComputeNodeStateChangeBase._finish_on_exception
+    @RetryMixin._retry(config.ARVADOS_ERRORS)
+    def create_arvados_node(self):
+        self.arvados_node = self._arvados.nodes().create(body={}).execute()
+        self._later.create_cloud_node()
 
     @ComputeNodeStateChangeBase._finish_on_exception
     @RetryMixin._retry(config.ARVADOS_ERRORS)
@@ -350,7 +359,8 @@ class ComputeNodeMonitorActor(config.actor_class):
         if (state == 'down' and
             self.arvados_node['first_ping_at'] and
             timestamp_fresh(self.cloud_node_start_time,
-                            self.boot_fail_after)):
+                            self.boot_fail_after) and
+            not self._cloud.broken(self.cloud_node)):
             state = 'idle'
 
         # "missing" means last_ping_at is stale, this should be
@@ -361,6 +371,7 @@ class ComputeNodeMonitorActor(config.actor_class):
         result = state in states
         if state == 'idle':
             result = result and not self.arvados_node['job_uuid']
+
         return result
 
     def shutdown_eligible(self):
index c4b0f3bff7fbe294d2620f5bbf48a9e33bfe2f2e..366c1f8b123fcb47331229fedf0fb3e1028fbbd1 100644 (file)
@@ -254,11 +254,12 @@ class NodeManagerDaemonActor(actor_class):
         return sum(1 for down in
                    pykka.get_all(rec.actor.in_state('down') for rec in
                                  self.cloud_nodes.nodes.itervalues()
-                                 if size is None or rec.cloud_node.size.id == size.id)
+                                 if ((size is None or rec.cloud_node.size.id == size.id) and
+                                     rec.cloud_node.id not in self.shutdowns))
                    if down)
 
     def _nodes_up(self, size):
-        up = (self._nodes_booting(size) + self._nodes_unpaired(size) + self._nodes_paired(size)) - self._nodes_down(size)
+        up = (self._nodes_booting(size) + self._nodes_unpaired(size) + self._nodes_paired(size)) - (self._nodes_down(size) + self._size_shutdowns(size))
         return up
 
     def _total_price(self):
@@ -280,17 +281,12 @@ class NodeManagerDaemonActor(actor_class):
         return sum(1 for c in self.last_wishlist if c.id == size.id)
 
     def _size_shutdowns(self, size):
-        sh = 0
-        for c in self.shutdowns.iterkeys():
-            try:
-                if self.sizes_booting_shutdown[c].id == size.id:
-                    sh += 1
-            except pykka.ActorDeadError:
-                pass
-        return sh
+        return sum(1
+                  for c in self.shutdowns.iterkeys()
+                  if size is None or self.sizes_booting_shutdown[c].id == size.id)
 
     def _nodes_wanted(self, size):
-        total_up_count = self._nodes_up(None)
+        total_up_count = self._nodes_up(None) + self._nodes_down(None)
         under_min = self.min_nodes - total_up_count
         over_max = total_up_count - self.max_nodes
         total_price = self._total_price()
@@ -370,23 +366,17 @@ class NodeManagerDaemonActor(actor_class):
         nodes_wanted = self._nodes_wanted(cloud_size)
         if nodes_wanted < 1:
             return None
+        arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
         self._logger.info("Want %i more %s nodes.  Booting a node.",
                           nodes_wanted, cloud_size.name)
-
-        arvados_client=self._new_arvados()
-        arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
-        if not arvados_node:
-            arvados_node = arvados_client.nodes().create(body={}).execute()
-            self._register_arvados_node(arvados_node["uuid"], arvados_node)
-
         new_setup = self._node_setup.start(
             timer_actor=self._timer,
-            arvados_client=arvados_client,
+            arvados_client=self._new_arvados(),
             arvados_node=arvados_node,
             cloud_client=self._new_cloud(),
             cloud_size=cloud_size).proxy()
-        self.booting[arvados_node['uuid']] = new_setup
-        self.sizes_booting_shutdown[arvados_node['uuid']] = cloud_size
+        self.booting[new_setup.actor_ref.actor_urn] = new_setup
+        self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
 
         if arvados_node is not None:
             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
@@ -409,10 +399,8 @@ class NodeManagerDaemonActor(actor_class):
         if cloud_node is not None:
             # Node creation succeeded.  Update cloud node list.
             self._register_cloud_node(cloud_node)
-            arvuuid = arvados_node["uuid"]
-            if arvuuid in self.booting:
-                del self.booting[arvuuid]
-                del self.sizes_booting_shutdown[arvuuid]
+        del self.booting[setup_proxy.actor_ref.actor_urn]
+        del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
 
     @_check_poll_freshness
     def stop_booting_node(self, size):
@@ -444,13 +432,15 @@ class NodeManagerDaemonActor(actor_class):
     @_check_poll_freshness
     def node_can_shutdown(self, node_actor):
         if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
+            print("excess")
             self._begin_node_shutdown(node_actor, cancellable=True)
         elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
             # Node is unpaired, which means it probably exceeded its booting
             # grace period without a ping, so shut it down so we can boot a new
             # node in its place.
+            print("unpaired")
             self._begin_node_shutdown(node_actor, cancellable=False)
-        elif node_actor.in_state('down'):
+        elif node_actor.in_state('down').get():
             # Node is down and unlikely to come back.
             self._begin_node_shutdown(node_actor, cancellable=False)
 
index 8def8535cf488e7deb028c6f01d0f9dc45ba3ffa..bf86e69ad576bc050e8c467b9b1fb65072891938 100644 (file)
@@ -356,7 +356,8 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
     def test_shutdown_without_arvados_node(self):
         self.make_actor(start_time=0)
         self.shutdowns._set_state(True, 600)
-        self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT), (True, "node state is ('unpaired', 'open', 'boot exceeded', 'idle exceeded')"))
+        self.assertEquals((True, "node state is ('unpaired', 'open', 'boot exceeded', 'idle exceeded')"),
+                          self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
     def test_shutdown_missing(self):
         arv_node = testutil.arvados_node_mock(10, job_uuid=None,
@@ -364,7 +365,8 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
                                               last_ping_at='1970-01-01T01:02:03.04050607Z')
         self.make_actor(10, arv_node)
         self.shutdowns._set_state(True, 600)
-        self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT), (True, "node state is ('down', 'open', 'boot wait', 'idle exceeded')"))
+        self.assertEquals((True, "node state is ('down', 'open', 'boot wait', 'idle exceeded')"),
+                          self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
     def test_shutdown_running_broken(self):
         arv_node = testutil.arvados_node_mock(12, job_uuid=None,
@@ -372,7 +374,8 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
         self.make_actor(12, arv_node)
         self.shutdowns._set_state(True, 600)
         self.cloud_client.broken.return_value = True
-        self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT), (True, "node state is ('down', 'open', 'boot wait', 'idle exceeded')"))
+        self.assertEquals((True, "node state is ('down', 'open', 'boot wait', 'idle exceeded')"),
+                          self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
     def test_shutdown_missing_broken(self):
         arv_node = testutil.arvados_node_mock(11, job_uuid=None,
@@ -385,27 +388,27 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
 
     def test_no_shutdown_when_window_closed(self):
         self.make_actor(3, testutil.arvados_node_mock(3, job_uuid=None))
-        self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
-                          (False, "node state is ('idle', 'closed', 'boot wait', 'idle exceeded')"))
+        self.assertEquals((False, "node state is ('idle', 'closed', 'boot wait', 'idle exceeded')"),
+                          self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
     def test_no_shutdown_when_node_running_job(self):
         self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
         self.shutdowns._set_state(True, 600)
-        self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
-                          (False, "node state is ('busy', 'open', 'boot wait', 'idle exceeded')"))
+        self.assertEquals((False, "node state is ('busy', 'open', 'boot wait', 'idle exceeded')"),
+                          self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
     def test_no_shutdown_when_node_state_unknown(self):
         self.make_actor(5, testutil.arvados_node_mock(
             5, crunch_worker_state=None))
         self.shutdowns._set_state(True, 600)
-        self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
-                          (False, "node is paired but crunch_worker_state is 'None'"))
+        self.assertEquals((False, "node is paired but crunch_worker_state is 'None'"),
+                          self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
     def test_no_shutdown_when_node_state_stale(self):
         self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
         self.shutdowns._set_state(True, 600)
-        self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
-                          (False, "node state is stale"))
+        self.assertEquals((False, "node state is stale"),
+                          self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
     def test_arvados_node_match(self):
         self.make_actor(2)
index 3cc3e78cf349e31308f43745741e21a6653d94a1..73b69d015c0f5a427f517b74768db293fb857b8f 100644 (file)
@@ -292,8 +292,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         setup = self.start_node_boot(cloud_node, arv_node)
         self.daemon.node_up(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
-        self.daemon.update_cloud_nodes([cloud_node])
         self.daemon.update_arvados_nodes([arv_node])
+        self.daemon.update_cloud_nodes([cloud_node])
+        self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-1801
         self.daemon.update_server_wishlist(
             [testutil.MockSize(1)]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -316,16 +317,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
 
-    def test_node_counted_after_boot_with_slow_listing(self):
-        # Test that, after we boot a compute node, we assume it exists
-        # even it doesn't appear in the listing (e.g., because of delays
-        # propagating tags).
-        setup = self.start_node_boot()
-        self.daemon.node_up(setup).get(self.TIMEOUT)
-        self.assertEqual(1, self.alive_monitor_count())
-        self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.assertEqual(1, self.alive_monitor_count())
-
     def test_booted_unlisted_node_counted(self):
         setup = self.start_node_boot(id_num=1)
         self.daemon.node_up(setup)
@@ -455,8 +446,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
     def test_shutdown_declined_at_wishlist_capacity(self):
         cloud_node = testutil.cloud_node_mock(1)
+        arv_node = testutil.arvados_node_mock(1)
         size = testutil.MockSize(1)
-        self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
+        self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
         self.assertEqual(1, self.alive_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
@@ -465,7 +457,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
     def test_shutdown_declined_below_min_nodes(self):
         cloud_node = testutil.cloud_node_mock(1)
-        self.make_daemon(cloud_nodes=[cloud_node], min_nodes=1)
+        arv_node = testutil.arvados_node_mock(1)
+        self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
         self.assertEqual(1, self.alive_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)