X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/caacfc031998dc73cd2f4c767e1a746b7783d379..b77893f2a8ae755f22615054f2c267d990995e1c:/services/nodemanager/arvnodeman/daemon.py diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py index 366c1f8b12..b4f17849f1 100644 --- a/services/nodemanager/arvnodeman/daemon.py +++ b/services/nodemanager/arvnodeman/daemon.py @@ -19,13 +19,12 @@ class _ComputeNodeRecord(object): self.cloud_node = cloud_node self.arvados_node = arvados_node self.assignment_time = assignment_time - + self.shutdown_actor = None class _BaseNodeTracker(object): def __init__(self): self.nodes = {} self.orphans = {} - self._blacklist = set() # Proxy the methods listed below to self.nodes. def _proxy_method(name): @@ -44,9 +43,6 @@ class _BaseNodeTracker(object): def add(self, record): self.nodes[self.record_key(record)] = record - def blacklist(self, key): - self._blacklist.add(key) - def update_record(self, key, item): setattr(self.nodes[key], self.RECORD_ATTR, item) @@ -54,9 +50,7 @@ class _BaseNodeTracker(object): unseen = set(self.nodes.iterkeys()) for item in response: key = self.item_key(item) - if key in self._blacklist: - continue - elif key in unseen: + if key in unseen: unseen.remove(key) self.update_record(key, item) else: @@ -67,10 +61,6 @@ class _BaseNodeTracker(object): return (record for record in self.nodes.itervalues() if getattr(record, self.PAIR_ATTR) is None) - def paired(self): - return (record for record in self.nodes.itervalues() - if getattr(record, self.PAIR_ATTR) is not None) - class _CloudNodeTracker(_BaseNodeTracker): RECORD_ATTR = 'cloud_node' @@ -143,9 +133,8 @@ class NodeManagerDaemonActor(actor_class): self.last_polls[poll_name] = -self.poll_stale_after self.cloud_nodes = _CloudNodeTracker() self.arvados_nodes = _ArvadosNodeTracker() - self.booting = {} # Arvados node UUID to ComputeNodeSetupActors - self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors - self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size + self.booting = {} # Actor IDs to ComputeNodeSetupActors + self.sizes_booting = {} # Actor IDs to node size def on_start(self): self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:])) @@ -201,16 +190,33 @@ class NodeManagerDaemonActor(actor_class): self.try_pairing() - for key, record in self.cloud_nodes.orphans.iteritems(): - if key in self.shutdowns: + for record in self.cloud_nodes.orphans.itervalues(): + if record.shutdown_actor: try: - self.shutdowns[key].stop().get() + record.shutdown_actor.stop() except pykka.ActorDeadError: pass - del self.shutdowns[key] - del self.sizes_booting_shutdown[key] - record.actor.stop() - record.cloud_node = None + record.shutdown_actor = None + + # A recently booted node is a node that successfully completed the + # setup actor but has not yet appeared in the cloud node list. + # This will have the tag _nodemanager_recently_booted on it, which + # means (if we're not shutting it down) we want to put it back into + # the cloud node list. Once it really appears in the cloud list, + # the object in record.cloud_node will be replaced by a new one + # that lacks the "_nodemanager_recently_booted" tag. + if hasattr(record.cloud_node, "_nodemanager_recently_booted"): + self.cloud_nodes.add(record) + else: + # Node disappeared from the cloud node list. Stop the monitor + # actor if necessary and forget about the node. + if record.actor: + try: + record.actor.stop() + except pykka.ActorDeadError: + pass + record.actor = None + record.cloud_node = None def _register_arvados_node(self, key, arv_node): self._logger.info("Registering new Arvados node %s", key) @@ -230,90 +236,77 @@ class NodeManagerDaemonActor(actor_class): self._pair_nodes(record, arv_rec.arvados_node) break - def _nodes_booting(self, size): s = sum(1 for c in self.booting.iterkeys() - if size is None or self.sizes_booting_shutdown[c].id == size.id) + if size is None or self.sizes_booting[c].id == size.id) return s - def _nodes_unpaired(self, size): - return sum(1 - for c in self.cloud_nodes.unpaired() - if size is None or c.cloud_node.size.id == size.id) - - def _nodes_paired(self, size): - return sum(1 - for c in self.cloud_nodes.paired() - if size is None or c.cloud_node.size.id == size.id) - - def _nodes_down(self, size): - # Make sure to iterate over self.cloud_nodes because what we're - # counting here are compute nodes that are reported by the cloud - # provider but are considered "down" by Arvados. - return sum(1 for down in - pykka.get_all(rec.actor.in_state('down') for rec in - self.cloud_nodes.nodes.itervalues() - if ((size is None or rec.cloud_node.size.id == size.id) and - rec.cloud_node.id not in self.shutdowns)) - if down) - - def _nodes_up(self, size): - up = (self._nodes_booting(size) + self._nodes_unpaired(size) + self._nodes_paired(size)) - (self._nodes_down(size) + self._size_shutdowns(size)) + def _node_states(self, size): + proxy_states = [] + states = [] + for rec in self.cloud_nodes.nodes.itervalues(): + if size is None or rec.cloud_node.size.id == size.id: + if rec.shutdown_actor is None and rec.actor is not None: + proxy_states.append(rec.actor.get_state()) + else: + states.append("shutdown") + return states + pykka.get_all(proxy_states) + + def _state_counts(self, size): + states = self._node_states(size) + counts = { + "booting": self._nodes_booting(size), + "unpaired": 0, + "busy": 0, + "idle": 0, + "down": 0, + "shutdown": 0 + } + for s in states: + counts[s] = counts[s] + 1 + return counts + + def _nodes_up(self, counts): + up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"] return up def _total_price(self): cost = 0 - cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price - for c in self.booting.iterkeys()) - cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price + cost += sum(self.sizes_booting[c].price + for c in self.booting.iterkeys()) + cost += sum(c.cloud_node.size.price for c in self.cloud_nodes.nodes.itervalues()) return cost - def _nodes_busy(self, size): - return sum(1 for busy in - pykka.get_all(rec.actor.in_state('busy') for rec in - self.cloud_nodes.nodes.itervalues() - if rec.cloud_node.size.id == size.id) - if busy) - def _size_wishlist(self, size): return sum(1 for c in self.last_wishlist if c.id == size.id) - def _size_shutdowns(self, size): - return sum(1 - for c in self.shutdowns.iterkeys() - if size is None or self.sizes_booting_shutdown[c].id == size.id) - def _nodes_wanted(self, size): - total_up_count = self._nodes_up(None) + self._nodes_down(None) - under_min = self.min_nodes - total_up_count - over_max = total_up_count - self.max_nodes + total_node_count = self._nodes_booting(None) + len(self.cloud_nodes) + under_min = self.min_nodes - total_node_count + over_max = total_node_count - self.max_nodes total_price = self._total_price() - if over_max >= 0: - return -over_max - elif under_min > 0 and size.id == self.min_cloud_size.id: - return under_min + counts = self._state_counts(size) - up_count = self._nodes_up(size) - booting_count = self._nodes_booting(size) - unpaired_count = self._nodes_unpaired(size) - paired_count = self._nodes_paired(size) - busy_count = self._nodes_busy(size) - down_count = self._nodes_down(size) - idle_count = paired_count - (busy_count+down_count) - shutdown_count = self._size_shutdowns(size) + up_count = self._nodes_up(counts) + busy_count = counts["busy"] self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name, self._size_wishlist(size), up_count, - booting_count, - unpaired_count, - idle_count, + counts["booting"], + counts["unpaired"], + counts["idle"], busy_count, - down_count, - shutdown_count) + counts["down"], + counts["shutdown"]) + + if over_max >= 0: + return -over_max + elif under_min > 0 and size.id == self.min_cloud_size.id: + return under_min wanted = self._size_wishlist(size) - (up_count - busy_count) if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price): @@ -326,10 +319,11 @@ class NodeManagerDaemonActor(actor_class): return wanted def _nodes_excess(self, size): - up_count = self._nodes_up(size) + counts = self._state_counts(size) + up_count = self._nodes_up(counts) if size.id == self.min_cloud_size.id: up_count -= self.min_nodes - return up_count - (self._nodes_busy(size) + self._size_wishlist(size)) + return up_count - (counts["busy"] + self._size_wishlist(size)) def update_server_wishlist(self, wishlist): self._update_poll_time('server_wishlist') @@ -376,7 +370,7 @@ class NodeManagerDaemonActor(actor_class): cloud_client=self._new_cloud(), cloud_size=cloud_size).proxy() self.booting[new_setup.actor_ref.actor_urn] = new_setup - self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size + self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size if arvados_node is not None: self.arvados_nodes[arvados_node['uuid']].assignment_time = ( @@ -398,9 +392,10 @@ class NodeManagerDaemonActor(actor_class): # successful and so there isn't anything to do. if cloud_node is not None: # Node creation succeeded. Update cloud node list. + cloud_node._nodemanager_recently_booted = True self._register_cloud_node(cloud_node) del self.booting[setup_proxy.actor_ref.actor_urn] - del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn] + del self.sizes_booting[setup_proxy.actor_ref.actor_urn] @_check_poll_freshness def stop_booting_node(self, size): @@ -410,7 +405,7 @@ class NodeManagerDaemonActor(actor_class): for key, node in self.booting.iteritems(): if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get(): del self.booting[key] - del self.sizes_booting_shutdown[key] + del self.sizes_booting[key] if nodes_excess > 1: self._later.stop_booting_node(size) @@ -419,45 +414,54 @@ class NodeManagerDaemonActor(actor_class): def _begin_node_shutdown(self, node_actor, cancellable): cloud_node_obj = node_actor.cloud_node.get() cloud_node_id = cloud_node_obj.id - if cloud_node_id in self.shutdowns: + record = self.cloud_nodes[cloud_node_id] + if record.shutdown_actor is not None: return None shutdown = self._node_shutdown.start( timer_actor=self._timer, cloud_client=self._new_cloud(), arvados_client=self._new_arvados(), node_monitor=node_actor.actor_ref, cancellable=cancellable) - self.shutdowns[cloud_node_id] = shutdown.proxy() - self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size + record.shutdown_actor = shutdown.proxy() shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown) @_check_poll_freshness def node_can_shutdown(self, node_actor): if self._nodes_excess(node_actor.cloud_node.get().size) > 0: - print("excess") self._begin_node_shutdown(node_actor, cancellable=True) elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None: # Node is unpaired, which means it probably exceeded its booting # grace period without a ping, so shut it down so we can boot a new # node in its place. - print("unpaired") self._begin_node_shutdown(node_actor, cancellable=False) elif node_actor.in_state('down').get(): # Node is down and unlikely to come back. self._begin_node_shutdown(node_actor, cancellable=False) def node_finished_shutdown(self, shutdown_actor): - cloud_node, success, cancel_reason = self._get_actor_attrs( - shutdown_actor, 'cloud_node', 'success', 'cancel_reason') + try: + cloud_node, success = self._get_actor_attrs( + shutdown_actor, 'cloud_node', 'success') + except pykka.ActorDeadError: + return cloud_node_id = cloud_node.id + record = self.cloud_nodes[cloud_node_id] shutdown_actor.stop() + record.shutdown_actor = None + if not success: - if cancel_reason == self._node_shutdown.NODE_BROKEN: - self.cloud_nodes.blacklist(cloud_node_id) - del self.shutdowns[cloud_node_id] - del self.sizes_booting_shutdown[cloud_node_id] - # On success, we want to leave the entry in self.shutdowns so that it - # won't try to shut down the node again. It should disappear from the - # cloud node list, and the entry in self.shutdowns will get cleaned up - # by update_cloud_nodes. + return + + # Shutdown was successful, so stop the monitor actor, otherwise it + # will keep offering the node as a candidate for shutdown. + record.actor.stop() + record.actor = None + + # If the node went from being booted to being shut down without ever + # appearing in the cloud node list, it will have the + # _nodemanager_recently_booted tag, so get rid of it so that the node + # can be forgotten completely. + if hasattr(record.cloud_node, "_nodemanager_recently_booted"): + del record.cloud_node._nodemanager_recently_booted def shutdown(self): self._logger.info("Shutting down after signal.")