X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4cd89bd1767bece226c412ae7c9ea37669e8706b..29665e2d9a543bffb237d148c3484c03b03e30aa:/services/nodemanager/arvnodeman/daemon.py diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py index dcbc87ccc8..9bfee79b59 100644 --- a/services/nodemanager/arvnodeman/daemon.py +++ b/services/nodemanager/arvnodeman/daemon.py @@ -9,6 +9,7 @@ import time import pykka from . import computenode as cnode +from . import status from .computenode import dispatch from .config import actor_class @@ -232,7 +233,7 @@ class NodeManagerDaemonActor(actor_class): def try_pairing(self): for record in self.cloud_nodes.unpaired(): for arv_rec in self.arvados_nodes.unpaired(): - if record.actor.offer_arvados_pair(arv_rec.arvados_node).get(): + if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get(): self._pair_nodes(record, arv_rec.arvados_node) break @@ -243,14 +244,27 @@ class NodeManagerDaemonActor(actor_class): return s def _node_states(self, size): - states = pykka.get_all(rec.actor.get_state() - for rec in self.cloud_nodes.nodes.itervalues() - if ((size is None or rec.cloud_node.size.id == size.id) and - rec.shutdown_actor is None)) - states += ['shutdown' for rec in self.cloud_nodes.nodes.itervalues() - if ((size is None or rec.cloud_node.size.id == size.id) and - rec.shutdown_actor is not None)] - return states + proxy_states = [] + states = [] + for rec in self.cloud_nodes.nodes.itervalues(): + if size is None or rec.cloud_node.size.id == size.id: + if rec.shutdown_actor is None and rec.actor is not None: + proxy_states.append(rec.actor.get_state()) + else: + states.append("shutdown") + return states + pykka.get_all(proxy_states) + + def _update_tracker(self): + updates = { + k: 0 + for k in status.tracker.keys() + if k.startswith('nodes_') + } + for s in self._node_states(size=None): + updates.setdefault('nodes_'+s, 0) + updates['nodes_'+s] += 1 + updates['nodes_wish'] = len(self.last_wishlist) + status.tracker.update(updates) def _state_counts(self, size): states = self._node_states(size) @@ -335,7 +349,11 @@ class NodeManagerDaemonActor(actor_class): elif (nodes_wanted < 0) and self.booting: self._later.stop_booting_node(size) except Exception as e: - self._logger.exception("while calculating nodes wanted for size %s", size) + self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)")) + try: + self._update_tracker() + except: + self._logger.exception("while updating tracker") def _check_poll_freshness(orig_func): """Decorator to inhibit a method when poll information is stale. @@ -425,16 +443,25 @@ class NodeManagerDaemonActor(actor_class): @_check_poll_freshness def node_can_shutdown(self, node_actor): - if self._nodes_excess(node_actor.cloud_node.get().size) > 0: - self._begin_node_shutdown(node_actor, cancellable=True) - elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None: - # Node is unpaired, which means it probably exceeded its booting - # grace period without a ping, so shut it down so we can boot a new - # node in its place. - self._begin_node_shutdown(node_actor, cancellable=False) - elif node_actor.in_state('down').get(): - # Node is down and unlikely to come back. - self._begin_node_shutdown(node_actor, cancellable=False) + try: + if self._nodes_excess(node_actor.cloud_node.get().size) > 0: + self._begin_node_shutdown(node_actor, cancellable=True) + elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None: + # Node is unpaired, which means it probably exceeded its booting + # grace period without a ping, so shut it down so we can boot a new + # node in its place. + self._begin_node_shutdown(node_actor, cancellable=False) + elif node_actor.in_state('down').get(): + # Node is down and unlikely to come back. + self._begin_node_shutdown(node_actor, cancellable=False) + except pykka.ActorDeadError as e: + # The monitor actor sends shutdown suggestions every time the + # node's state is updated, and these go into the daemon actor's + # message queue. It's possible that the node has already been shut + # down (which shuts down the node monitor actor). In that case, + # this message is stale and we'll get ActorDeadError when we try to + # access node_actor. Log the error. + self._logger.debug("ActorDeadError in node_can_shutdown: %s", e) def node_finished_shutdown(self, shutdown_actor): try: