X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/29379beaa615d5a36032a05e71d7a2730e255c48..5c549965a11b6a2ce789c1e0db9e418f695aed84:/services/nodemanager/arvnodeman/daemon.py diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py index 11204409c0..589b9a1b64 100644 --- a/services/nodemanager/arvnodeman/daemon.py +++ b/services/nodemanager/arvnodeman/daemon.py @@ -19,7 +19,7 @@ class _ComputeNodeRecord(object): self.cloud_node = cloud_node self.arvados_node = arvados_node self.assignment_time = assignment_time - + self.shutdown_actor = None class _BaseNodeTracker(object): def __init__(self): @@ -67,6 +67,10 @@ class _BaseNodeTracker(object): return (record for record in self.nodes.itervalues() if getattr(record, self.PAIR_ATTR) is None) + def paired(self): + return (record for record in self.nodes.itervalues() + if getattr(record, self.PAIR_ATTR) is not None) + class _CloudNodeTracker(_BaseNodeTracker): RECORD_ATTR = 'cloud_node' @@ -140,9 +144,7 @@ class NodeManagerDaemonActor(actor_class): self.cloud_nodes = _CloudNodeTracker() self.arvados_nodes = _ArvadosNodeTracker() self.booting = {} # Actor IDs to ComputeNodeSetupActors - self.booted = {} # Cloud node IDs to _ComputeNodeRecords - self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors - self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size + self.sizes_booting = {} # Actor IDs to node size def on_start(self): self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:])) @@ -182,137 +184,133 @@ class NodeManagerDaemonActor(actor_class): record = _ComputeNodeRecord(actor.proxy(), cloud_node) return record + def _register_cloud_node(self, node): + rec = self.cloud_nodes.get(node.id) + if rec is None: + self._logger.info("Registering new cloud node %s", node.id) + record = self._new_node(node) + self.cloud_nodes.add(record) + else: + rec.cloud_node = node + def update_cloud_nodes(self, nodelist): self._update_poll_time('cloud_nodes') - for key, node in self.cloud_nodes.update_from(nodelist): - self._logger.info("Registering new cloud node %s", key) - if key in self.booted: - record = self.booted.pop(key) - else: - record = self._new_node(node) - self.cloud_nodes.add(record) - for arv_rec in self.arvados_nodes.unpaired(): - if record.actor.offer_arvados_pair(arv_rec.arvados_node).get(): - self._pair_nodes(record, arv_rec.arvados_node) - break - for key, record in self.cloud_nodes.orphans.iteritems(): - if key in self.shutdowns: + for _, node in self.cloud_nodes.update_from(nodelist): + self._register_cloud_node(node) + + self.try_pairing() + + for record in self.cloud_nodes.orphans.itervalues(): + if record.shutdown_actor: try: - self.shutdowns[key].stop().get() + record.shutdown_actor.stop() except pykka.ActorDeadError: pass - del self.shutdowns[key] - del self.sizes_booting_shutdown[key] - record.actor.stop() - record.cloud_node = None + record.shutdown_actor = None + + # A recently booted node is a node that successfully completed the + # setup actor but has not yet appeared in the cloud node list. + # This will have the tag _nodemanager_recently_booted on it, which + # means (if we're not shutting it down) we want to put it back into + # the cloud node list. Once it really appears in the cloud list, + # the object in record.cloud_node will be replaced by a new one + # that lacks the "_nodemanager_recently_booted" tag. + if hasattr(record.cloud_node, "_nodemanager_recently_booted"): + self.cloud_nodes.add(record) + else: + record.actor.stop() + record.cloud_node = None + + def _register_arvados_node(self, key, arv_node): + self._logger.info("Registering new Arvados node %s", key) + record = _ComputeNodeRecord(arvados_node=arv_node) + self.arvados_nodes.add(record) def update_arvados_nodes(self, nodelist): self._update_poll_time('arvados_nodes') for key, node in self.arvados_nodes.update_from(nodelist): - self._logger.info("Registering new Arvados node %s", key) - record = _ComputeNodeRecord(arvados_node=node) - self.arvados_nodes.add(record) - for arv_rec in self.arvados_nodes.unpaired(): - arv_node = arv_rec.arvados_node - for cloud_rec in self.cloud_nodes.unpaired(): - if cloud_rec.actor.offer_arvados_pair(arv_node).get(): - self._pair_nodes(cloud_rec, arv_node) + self._register_arvados_node(key, node) + self.try_pairing() + + def try_pairing(self): + for record in self.cloud_nodes.unpaired(): + for arv_rec in self.arvados_nodes.unpaired(): + if record.actor.offer_arvados_pair(arv_rec.arvados_node).get(): + self._pair_nodes(record, arv_rec.arvados_node) break def _nodes_booting(self, size): s = sum(1 for c in self.booting.iterkeys() - if size is None or self.sizes_booting_shutdown[c].id == size.id) - s += sum(1 - for c in self.booted.itervalues() - if size is None or c.cloud_node.size.id == size.id) + if size is None or self.sizes_booting[c].id == size.id) return s - def _nodes_unpaired(self, size): - return sum(1 - for c in self.cloud_nodes.unpaired() - if size is None or c.cloud_node.size.id == size.id) - - def _nodes_booted(self, size): - return sum(1 - for c in self.cloud_nodes.nodes.itervalues() - if size is None or c.cloud_node.size.id == size.id) - - def _nodes_down(self, size): - # Make sure to iterate over self.cloud_nodes because what we're - # counting here are compute nodes that are reported by the cloud - # provider but are considered "down" by Arvados. - return sum(1 for down in - pykka.get_all(rec.actor.in_state('down') for rec in - self.cloud_nodes.nodes.itervalues() - if size is None or rec.cloud_node.size.id == size.id) - if down) - - def _nodes_up(self, size): - up = (self._nodes_booting(size) + self._nodes_booted(size)) - self._nodes_down(size) + def _node_states(self, size): + states = pykka.get_all(rec.actor.get_state() + for rec in self.cloud_nodes.nodes.itervalues() + if ((size is None or rec.cloud_node.size.id == size.id) and + rec.shutdown_actor is None)) + states += ['shutdown' for rec in self.cloud_nodes.nodes.itervalues() + if ((size is None or rec.cloud_node.size.id == size.id) and + rec.shutdown_actor is not None)] + return states + + def _state_counts(self, size): + states = self._node_states(size) + counts = { + "booting": self._nodes_booting(size), + "unpaired": 0, + "busy": 0, + "idle": 0, + "down": 0, + "shutdown": 0 + } + for s in states: + counts[s] = counts[s] + 1 + return counts + + def _nodes_up(self, counts): + up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"] return up def _total_price(self): cost = 0 - cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price + cost += sum(self.server_calculator.find_size(self.sizes_booting[c].id).price for c in self.booting.iterkeys()) cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price - for i in (self.booted, self.cloud_nodes.nodes) - for c in i.itervalues()) + for c in self.cloud_nodes.nodes.itervalues()) return cost - def _nodes_busy(self, size): - return sum(1 for busy in - pykka.get_all(rec.actor.in_state('busy') for rec in - self.cloud_nodes.nodes.itervalues() - if rec.cloud_node.size.id == size.id) - if busy) - - def _nodes_missing(self, size): - return sum(1 for arv_node in - pykka.get_all(rec.actor.arvados_node for rec in - self.cloud_nodes.nodes.itervalues() - if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns) - if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after)) - def _size_wishlist(self, size): return sum(1 for c in self.last_wishlist if c.id == size.id) - def _size_shutdowns(self, size): - sh = 0 - for c in self.shutdowns.iterkeys(): - try: - if self.sizes_booting_shutdown[c].id == size.id: - sh += 1 - except pykka.ActorDeadError: - pass - return sh - def _nodes_wanted(self, size): - total_up_count = self._nodes_up(None) - under_min = self.min_nodes - total_up_count - over_max = total_up_count - self.max_nodes + total_node_count = self._nodes_booting(None) + len(self.cloud_nodes) + under_min = self.min_nodes - total_node_count + over_max = total_node_count - self.max_nodes total_price = self._total_price() - if over_max >= 0: - return -over_max - elif under_min > 0 and size.id == self.min_cloud_size.id: - return under_min + counts = self._state_counts(size) - booting_count = self._nodes_booting(size) + self._nodes_unpaired(size) - shutdown_count = self._size_shutdowns(size) - busy_count = self._nodes_busy(size) - idle_count = self._nodes_up(size) - (busy_count + self._nodes_missing(size)) + up_count = self._nodes_up(counts) + busy_count = counts["busy"] - self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name, + self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name, self._size_wishlist(size), - idle_count + busy_count, - booting_count, - idle_count - booting_count, + up_count, + counts["booting"], + counts["unpaired"], + counts["idle"], busy_count, - shutdown_count) + counts["down"], + counts["shutdown"]) + + if over_max >= 0: + return -over_max + elif under_min > 0 and size.id == self.min_cloud_size.id: + return under_min - wanted = self._size_wishlist(size) - idle_count + wanted = self._size_wishlist(size) - (up_count - busy_count) if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price): can_boot = int((self.max_total_price - total_price) / size.price) if can_boot == 0: @@ -323,10 +321,11 @@ class NodeManagerDaemonActor(actor_class): return wanted def _nodes_excess(self, size): - up_count = (self._nodes_booting(size) + self._nodes_booted(size)) - self._size_shutdowns(size) + counts = self._state_counts(size) + up_count = self._nodes_up(counts) if size.id == self.min_cloud_size.id: up_count -= self.min_nodes - return up_count - self._nodes_busy(size) - self._size_wishlist(size) + return up_count - (counts["busy"] + self._size_wishlist(size)) def update_server_wishlist(self, wishlist): self._update_poll_time('server_wishlist') @@ -373,7 +372,7 @@ class NodeManagerDaemonActor(actor_class): cloud_client=self._new_cloud(), cloud_size=cloud_size).proxy() self.booting[new_setup.actor_ref.actor_urn] = new_setup - self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size + self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size if arvados_node is not None: self.arvados_nodes[arvados_node['uuid']].assignment_time = ( @@ -386,18 +385,19 @@ class NodeManagerDaemonActor(actor_class): return pykka.get_all([getattr(actor, name) for name in attr_names]) def node_up(self, setup_proxy): - cloud_node = setup_proxy.cloud_node.get() - del self.booting[setup_proxy.actor_ref.actor_urn] - del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn] - + # Called when a SetupActor has completed. + cloud_node, arvados_node = self._get_actor_attrs( + setup_proxy, 'cloud_node', 'arvados_node') setup_proxy.stop() + + # If cloud_node is None then the node create wasn't + # successful and so there isn't anything to do. if cloud_node is not None: - record = self.cloud_nodes.get(cloud_node.id) - if record is None: - record = self._new_node(cloud_node) - self.booted[cloud_node.id] = record - self._timer.schedule(time.time() + self.boot_fail_after, - self._later.shutdown_unpaired_node, cloud_node.id) + # Node creation succeeded. Update cloud node list. + cloud_node._nodemanager_recently_booted = True + self._register_cloud_node(cloud_node) + del self.booting[setup_proxy.actor_ref.actor_urn] + del self.sizes_booting[setup_proxy.actor_ref.actor_urn] @_check_poll_freshness def stop_booting_node(self, size): @@ -405,9 +405,9 @@ class NodeManagerDaemonActor(actor_class): if (nodes_excess < 1) or not self.booting: return None for key, node in self.booting.iteritems(): - if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get(): + if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get(): del self.booting[key] - del self.sizes_booting_shutdown[key] + del self.sizes_booting[key] if nodes_excess > 1: self._later.stop_booting_node(size) @@ -416,43 +416,48 @@ class NodeManagerDaemonActor(actor_class): def _begin_node_shutdown(self, node_actor, cancellable): cloud_node_obj = node_actor.cloud_node.get() cloud_node_id = cloud_node_obj.id - if cloud_node_id in self.shutdowns: + record = self.cloud_nodes[cloud_node_id] + if record.shutdown_actor is not None: return None shutdown = self._node_shutdown.start( timer_actor=self._timer, cloud_client=self._new_cloud(), arvados_client=self._new_arvados(), node_monitor=node_actor.actor_ref, cancellable=cancellable) - self.shutdowns[cloud_node_id] = shutdown.proxy() - self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size + record.shutdown_actor = shutdown.proxy() shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown) @_check_poll_freshness def node_can_shutdown(self, node_actor): if self._nodes_excess(node_actor.cloud_node.get().size) > 0: + print("excess") self._begin_node_shutdown(node_actor, cancellable=True) - - def shutdown_unpaired_node(self, cloud_node_id): - for record_dict in [self.cloud_nodes, self.booted]: - if cloud_node_id in record_dict: - record = record_dict[cloud_node_id] - break - else: - return None - if not record.actor.in_state('idle', 'busy').get(): - self._begin_node_shutdown(record.actor, cancellable=False) + elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None: + # Node is unpaired, which means it probably exceeded its booting + # grace period without a ping, so shut it down so we can boot a new + # node in its place. + print("unpaired") + self._begin_node_shutdown(node_actor, cancellable=False) + elif node_actor.in_state('down').get(): + # Node is down and unlikely to come back. + self._begin_node_shutdown(node_actor, cancellable=False) def node_finished_shutdown(self, shutdown_actor): cloud_node, success, cancel_reason = self._get_actor_attrs( shutdown_actor, 'cloud_node', 'success', 'cancel_reason') - shutdown_actor.stop() cloud_node_id = cloud_node.id + record = self.cloud_nodes[cloud_node_id] + shutdown_actor.stop() if not success: if cancel_reason == self._node_shutdown.NODE_BROKEN: self.cloud_nodes.blacklist(cloud_node_id) - elif cloud_node_id in self.booted: - self.booted.pop(cloud_node_id).actor.stop() - del self.shutdowns[cloud_node_id] - del self.sizes_booting_shutdown[cloud_node_id] + record.shutdown_actor = None + else: + # If the node went from being booted to being shut down without ever + # appearing in the cloud node list, it will have the + # _nodemanager_recently_booted tag, so get rid of it so that the node + # can be forgotten completely. + if hasattr(self.cloud_nodes[cloud_node_id].cloud_node, "_nodemanager_recently_booted"): + del self.cloud_nodes[cloud_node_id].cloud_node._nodemanager_recently_booted def shutdown(self): self._logger.info("Shutting down after signal.")