From ab3afbb684bc1b32577c2696e13882123bfff7d2 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 16 Jul 2018 17:19:53 -0400 Subject: [PATCH] 13804: Try to cancel pending shutdowns if nodes are needed Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- .../computenode/dispatch/__init__.py | 6 +- services/nodemanager/arvnodeman/daemon.py | 62 ++++++++++++------- .../nodemanager/tests/integration_test.py | 7 ++- 3 files changed, 50 insertions(+), 25 deletions(-) diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py index d9b475b908..bb83a193fa 100644 --- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py +++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py @@ -243,12 +243,15 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase): return super(ComputeNodeShutdownActor, self)._finished() def cancel_shutdown(self, reason, **kwargs): + if not self.cancellable: + return False if self.cancel_reason is not None: # already cancelled - return + return False self.cancel_reason = reason self._logger.info("Shutdown cancelled: %s.", reason) self._finished(success_flag=False) + return True def _cancel_on_exception(orig_func): @functools.wraps(orig_func) @@ -282,6 +285,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase): self._logger.info("Starting shutdown") arv_node = self._arvados_node() if self._cloud.destroy_node(self.cloud_node): + self.cancellable = False self._logger.info("Shutdown success") if arv_node: self._later.clean_arvados_node(arv_node) diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py index 0d6fdfca9a..2e71b316df 100644 --- a/services/nodemanager/arvnodeman/daemon.py +++ b/services/nodemanager/arvnodeman/daemon.py @@ -390,22 +390,25 @@ class NodeManagerDaemonActor(actor_class): nodes_wanted = self._nodes_wanted(cloud_size) if nodes_wanted < 1: return None - arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after) - self._logger.info("Want %i more %s nodes. Booting a node.", - nodes_wanted, cloud_size.name) - new_setup = self._node_setup.start( - timer_actor=self._timer, - arvados_client=self._new_arvados(), - arvados_node=arvados_node, - cloud_client=self._new_cloud(), - cloud_size=self.server_calculator.find_size(cloud_size.id)) - self.booting[new_setup.actor_urn] = new_setup.proxy() - self.sizes_booting[new_setup.actor_urn] = cloud_size - - if arvados_node is not None: - self.arvados_nodes[arvados_node['uuid']].assignment_time = ( - time.time()) - new_setup.tell_proxy().subscribe(self._later.node_setup_finished) + + if not self.cancel_node_shutdown(cloud_size): + arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after) + self._logger.info("Want %i more %s nodes. Booting a node.", + nodes_wanted, cloud_size.name) + new_setup = self._node_setup.start( + timer_actor=self._timer, + arvados_client=self._new_arvados(), + arvados_node=arvados_node, + cloud_client=self._new_cloud(), + cloud_size=self.server_calculator.find_size(cloud_size.id)) + self.booting[new_setup.actor_urn] = new_setup.proxy() + self.sizes_booting[new_setup.actor_urn] = cloud_size + + if arvados_node is not None: + self.arvados_nodes[arvados_node['uuid']].assignment_time = ( + time.time()) + new_setup.tell_proxy().subscribe(self._later.node_setup_finished) + if nodes_wanted > 1: self._later.start_node(cloud_size) @@ -456,13 +459,28 @@ class NodeManagerDaemonActor(actor_class): if (nodes_excess < 1) or not self.booting: return None for key, node in self.booting.iteritems(): - if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get(): - del self.booting[key] - del self.sizes_booting[key] + try: + if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get(2): + del self.booting[key] + del self.sizes_booting[key] + if nodes_excess > 1: + self._later.stop_booting_node(size) + return + except pykka.Timeout: + pass - if nodes_excess > 1: - self._later.stop_booting_node(size) - break + @_check_poll_freshness + def cancel_node_shutdown(self, size): + # Go through shutdown actors and see if there are any of the appropriate size that can be cancelled + for record in self.cloud_nodes.nodes.itervalues(): + try: + if (record.shutdown_actor is not None and + record.size.id == size.id and + record.shutdown_actor.cancel_shutdown("Node size is in wishlist").get(2)): + return True + except (pykka.ActorDeadError, pykka.Timeout) as e: + pass + return False def _begin_node_shutdown(self, node_actor, cancellable): cloud_node_obj = node_actor.cloud_node.get() diff --git a/services/nodemanager/tests/integration_test.py b/services/nodemanager/tests/integration_test.py index 508e626639..284b361cea 100755 --- a/services/nodemanager/tests/integration_test.py +++ b/services/nodemanager/tests/integration_test.py @@ -124,8 +124,11 @@ def node_busy(g): def node_shutdown(g): global compute_nodes - del compute_nodes[g.group(1)] - return 0 + if g.group(1) in compute_nodes: + del compute_nodes[g.group(1)] + return 0 + else: + return 1 def jobs_req(g): global all_jobs -- 2.39.5