X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6f9bc5a295042fdcc0e51b193d3f31633d58c5d1..8be16cfc7e163cc96995be891e53050febfb1fca:/services/nodemanager/arvnodeman/daemon.py diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py index 266b66545f..9bfee79b59 100644 --- a/services/nodemanager/arvnodeman/daemon.py +++ b/services/nodemanager/arvnodeman/daemon.py @@ -9,6 +9,7 @@ import time import pykka from . import computenode as cnode +from . import status from .computenode import dispatch from .config import actor_class @@ -25,7 +26,6 @@ class _BaseNodeTracker(object): def __init__(self): self.nodes = {} self.orphans = {} - self._blacklist = set() # Proxy the methods listed below to self.nodes. def _proxy_method(name): @@ -44,9 +44,6 @@ class _BaseNodeTracker(object): def add(self, record): self.nodes[self.record_key(record)] = record - def blacklist(self, key): - self._blacklist.add(key) - def update_record(self, key, item): setattr(self.nodes[key], self.RECORD_ATTR, item) @@ -54,9 +51,7 @@ class _BaseNodeTracker(object): unseen = set(self.nodes.iterkeys()) for item in response: key = self.item_key(item) - if key in self._blacklist: - continue - elif key in unseen: + if key in unseen: unseen.remove(key) self.update_record(key, item) else: @@ -67,10 +62,6 @@ class _BaseNodeTracker(object): return (record for record in self.nodes.itervalues() if getattr(record, self.PAIR_ATTR) is None) - def paired(self): - return (record for record in self.nodes.itervalues() - if getattr(record, self.PAIR_ATTR) is not None) - class _CloudNodeTracker(_BaseNodeTracker): RECORD_ATTR = 'cloud_node' @@ -218,7 +209,14 @@ class NodeManagerDaemonActor(actor_class): if hasattr(record.cloud_node, "_nodemanager_recently_booted"): self.cloud_nodes.add(record) else: - record.actor.stop() + # Node disappeared from the cloud node list. Stop the monitor + # actor if necessary and forget about the node. + if record.actor: + try: + record.actor.stop() + except pykka.ActorDeadError: + pass + record.actor = None record.cloud_node = None def _register_arvados_node(self, key, arv_node): @@ -235,7 +233,7 @@ class NodeManagerDaemonActor(actor_class): def try_pairing(self): for record in self.cloud_nodes.unpaired(): for arv_rec in self.arvados_nodes.unpaired(): - if record.actor.offer_arvados_pair(arv_rec.arvados_node).get(): + if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get(): self._pair_nodes(record, arv_rec.arvados_node) break @@ -246,14 +244,27 @@ class NodeManagerDaemonActor(actor_class): return s def _node_states(self, size): - states = pykka.get_all(rec.actor.get_state() - for rec in self.cloud_nodes.nodes.itervalues() - if ((size is None or rec.cloud_node.size.id == size.id) and - rec.shutdown_actor is None)) - states += ['shutdown' for rec in self.cloud_nodes.nodes.itervalues() - if ((size is None or rec.cloud_node.size.id == size.id) and - rec.shutdown_actor is not None)] - return states + proxy_states = [] + states = [] + for rec in self.cloud_nodes.nodes.itervalues(): + if size is None or rec.cloud_node.size.id == size.id: + if rec.shutdown_actor is None and rec.actor is not None: + proxy_states.append(rec.actor.get_state()) + else: + states.append("shutdown") + return states + pykka.get_all(proxy_states) + + def _update_tracker(self): + updates = { + k: 0 + for k in status.tracker.keys() + if k.startswith('nodes_') + } + for s in self._node_states(size=None): + updates.setdefault('nodes_'+s, 0) + updates['nodes_'+s] += 1 + updates['nodes_wish'] = len(self.last_wishlist) + status.tracker.update(updates) def _state_counts(self, size): states = self._node_states(size) @@ -275,9 +286,9 @@ class NodeManagerDaemonActor(actor_class): def _total_price(self): cost = 0 - cost += sum(self.server_calculator.find_size(self.sizes_booting[c].id).price - for c in self.booting.iterkeys()) - cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price + cost += sum(self.sizes_booting[c].price + for c in self.booting.iterkeys()) + cost += sum(c.cloud_node.size.price for c in self.cloud_nodes.nodes.itervalues()) return cost @@ -338,7 +349,11 @@ class NodeManagerDaemonActor(actor_class): elif (nodes_wanted < 0) and self.booting: self._later.stop_booting_node(size) except Exception as e: - self._logger.exception("while calculating nodes wanted for size %s", size) + self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)")) + try: + self._update_tracker() + except: + self._logger.exception("while updating tracker") def _check_poll_freshness(orig_func): """Decorator to inhibit a method when poll information is stale. @@ -428,34 +443,51 @@ class NodeManagerDaemonActor(actor_class): @_check_poll_freshness def node_can_shutdown(self, node_actor): - if self._nodes_excess(node_actor.cloud_node.get().size) > 0: - self._begin_node_shutdown(node_actor, cancellable=True) - elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None: - # Node is unpaired, which means it probably exceeded its booting - # grace period without a ping, so shut it down so we can boot a new - # node in its place. - self._begin_node_shutdown(node_actor, cancellable=False) - elif node_actor.in_state('down').get(): - # Node is down and unlikely to come back. - self._begin_node_shutdown(node_actor, cancellable=False) + try: + if self._nodes_excess(node_actor.cloud_node.get().size) > 0: + self._begin_node_shutdown(node_actor, cancellable=True) + elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None: + # Node is unpaired, which means it probably exceeded its booting + # grace period without a ping, so shut it down so we can boot a new + # node in its place. + self._begin_node_shutdown(node_actor, cancellable=False) + elif node_actor.in_state('down').get(): + # Node is down and unlikely to come back. + self._begin_node_shutdown(node_actor, cancellable=False) + except pykka.ActorDeadError as e: + # The monitor actor sends shutdown suggestions every time the + # node's state is updated, and these go into the daemon actor's + # message queue. It's possible that the node has already been shut + # down (which shuts down the node monitor actor). In that case, + # this message is stale and we'll get ActorDeadError when we try to + # access node_actor. Log the error. + self._logger.debug("ActorDeadError in node_can_shutdown: %s", e) def node_finished_shutdown(self, shutdown_actor): - cloud_node, success, cancel_reason = self._get_actor_attrs( - shutdown_actor, 'cloud_node', 'success', 'cancel_reason') + try: + cloud_node, success = self._get_actor_attrs( + shutdown_actor, 'cloud_node', 'success') + except pykka.ActorDeadError: + return cloud_node_id = cloud_node.id record = self.cloud_nodes[cloud_node_id] shutdown_actor.stop() + record.shutdown_actor = None + if not success: - if cancel_reason == self._node_shutdown.NODE_BROKEN: - self.cloud_nodes.blacklist(cloud_node_id) - record.shutdown_actor = None - else: - # If the node went from being booted to being shut down without ever - # appearing in the cloud node list, it will have the - # _nodemanager_recently_booted tag, so get rid of it so that the node - # can be forgotten completely. - if hasattr(self.cloud_nodes[cloud_node_id].cloud_node, "_nodemanager_recently_booted"): - del self.cloud_nodes[cloud_node_id].cloud_node._nodemanager_recently_booted + return + + # Shutdown was successful, so stop the monitor actor, otherwise it + # will keep offering the node as a candidate for shutdown. + record.actor.stop() + record.actor = None + + # If the node went from being booted to being shut down without ever + # appearing in the cloud node list, it will have the + # _nodemanager_recently_booted tag, so get rid of it so that the node + # can be forgotten completely. + if hasattr(record.cloud_node, "_nodemanager_recently_booted"): + del record.cloud_node._nodemanager_recently_booted def shutdown(self): self._logger.info("Shutting down after signal.")