X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ab9a73d2c0b567d3c05d1d4d8463633a69eafda2..080c940d7a8134a6e277a53b7e45eb27e2b2c87f:/services/nodemanager/arvnodeman/daemon.py diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py index 5522877bc0..1edf4dc479 100644 --- a/services/nodemanager/arvnodeman/daemon.py +++ b/services/nodemanager/arvnodeman/daemon.py @@ -1,4 +1,7 @@ #!/usr/bin/env python +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 from __future__ import absolute_import, print_function @@ -75,7 +78,10 @@ class _ArvadosNodeTracker(_BaseNodeTracker): item_key = staticmethod(lambda arvados_node: arvados_node['uuid']) def find_stale_node(self, stale_time): - for record in self.nodes.itervalues(): + # Try to select a stale node record that have an assigned slot first + for record in sorted(self.nodes.itervalues(), + key=lambda r: r.arvados_node['slot_number'], + reverse=True): node = record.arvados_node if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node), stale_time) and @@ -106,7 +112,8 @@ class NodeManagerDaemonActor(actor_class): node_setup_class=dispatch.ComputeNodeSetupActor, node_shutdown_class=dispatch.ComputeNodeShutdownActor, node_actor_class=dispatch.ComputeNodeMonitorActor, - max_total_price=0): + max_total_price=0, + consecutive_idle_count=1): super(NodeManagerDaemonActor, self).__init__() self._node_setup = node_setup_class self._node_shutdown = node_shutdown_class @@ -122,10 +129,12 @@ class NodeManagerDaemonActor(actor_class): self.min_cloud_size = self.server_calculator.cheapest_size() self.min_nodes = min_nodes self.max_nodes = max_nodes + self.node_quota = max_nodes self.max_total_price = max_total_price self.poll_stale_after = poll_stale_after self.boot_fail_after = boot_fail_after self.node_stale_after = node_stale_after + self.consecutive_idle_count = consecutive_idle_count self.last_polls = {} for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']: poll_actor = locals()[poll_name + '_actor'] @@ -145,8 +154,8 @@ class NodeManagerDaemonActor(actor_class): self.last_polls[poll_key] = time.time() def _pair_nodes(self, node_record, arvados_node): - self._logger.info("Cloud node %s is now paired with Arvados node %s", - node_record.cloud_node.name, arvados_node['uuid']) + self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s", + node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname']) self._arvados_nodes_actor.subscribe_to( arvados_node['uuid'], node_record.actor.update_arvados_node) node_record.arvados_node = arvados_node @@ -160,14 +169,14 @@ class NodeManagerDaemonActor(actor_class): cloud_node=cloud_node, cloud_node_start_time=start_time, shutdown_timer=shutdown_timer, - cloud_fqdn_func=self._cloud_driver.node_fqdn, update_actor=self._cloud_updater, timer_actor=self._timer, arvados_node=None, poll_stale_after=self.poll_stale_after, node_stale_after=self.node_stale_after, cloud_client=self._cloud_driver, - boot_fail_after=self.boot_fail_after) + boot_fail_after=self.boot_fail_after, + consecutive_idle_count=self.consecutive_idle_count) actorTell = actor.tell_proxy() actorTell.subscribe(self._later.node_can_shutdown) self._cloud_nodes_actor.subscribe_to(cloud_node.id, @@ -209,8 +218,11 @@ class NodeManagerDaemonActor(actor_class): if hasattr(record.cloud_node, "_nodemanager_recently_booted"): self.cloud_nodes.add(record) else: - # Node disappeared from the cloud node list. Stop the monitor - # actor if necessary and forget about the node. + # Node disappeared from the cloud node list. If it's paired, + # remove its idle time counter. + if record.arvados_node: + status.tracker.idle_out(record.arvados_node.get('hostname')) + # Stop the monitor actor if necessary and forget about the node. if record.actor: try: record.actor.stop() @@ -263,6 +275,8 @@ class NodeManagerDaemonActor(actor_class): for s in self._node_states(size=None): updates.setdefault('nodes_'+s, 0) updates['nodes_'+s] += 1 + updates['nodes_wish'] = len(self.last_wishlist) + updates['node_quota'] = self.node_quota status.tracker.update(updates) def _state_counts(self, size): @@ -272,6 +286,7 @@ class NodeManagerDaemonActor(actor_class): "unpaired": 0, "busy": 0, "idle": 0, + "fail": 0, "down": 0, "shutdown": 0 } @@ -297,22 +312,23 @@ class NodeManagerDaemonActor(actor_class): def _nodes_wanted(self, size): total_node_count = self._nodes_booting(None) + len(self.cloud_nodes) under_min = self.min_nodes - total_node_count - over_max = total_node_count - self.max_nodes + over_max = total_node_count - self.node_quota total_price = self._total_price() counts = self._state_counts(size) up_count = self._nodes_up(counts) busy_count = counts["busy"] + wishlist_count = self._size_wishlist(size) - self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name, - self._size_wishlist(size), + self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.id, + wishlist_count, up_count, counts["booting"], counts["unpaired"], counts["idle"], busy_count, - counts["down"], + counts["down"]+counts["fail"], counts["shutdown"]) if over_max >= 0: @@ -320,12 +336,12 @@ class NodeManagerDaemonActor(actor_class): elif under_min > 0 and size.id == self.min_cloud_size.id: return under_min - wanted = self._size_wishlist(size) - (up_count - busy_count) + wanted = wishlist_count - (up_count - busy_count) if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price): can_boot = int((self.max_total_price - total_price) / size.price) if can_boot == 0: self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)", - size.name, size.price, self.max_total_price, total_price) + size.id, size.price, self.max_total_price, total_price) return can_boot else: return wanted @@ -339,7 +355,8 @@ class NodeManagerDaemonActor(actor_class): def update_server_wishlist(self, wishlist): self._update_poll_time('server_wishlist') - self.last_wishlist = wishlist + requestable_nodes = self.node_quota - (self._nodes_booting(None) + len(self.cloud_nodes)) + self.last_wishlist = wishlist[:requestable_nodes] for size in reversed(self.server_calculator.cloud_sizes): try: nodes_wanted = self._nodes_wanted(size) @@ -347,7 +364,7 @@ class NodeManagerDaemonActor(actor_class): self._later.start_node(size) elif (nodes_wanted < 0) and self.booting: self._later.stop_booting_node(size) - except Exception as e: + except Exception: self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)")) try: self._update_tracker() @@ -376,40 +393,66 @@ class NodeManagerDaemonActor(actor_class): nodes_wanted = self._nodes_wanted(cloud_size) if nodes_wanted < 1: return None - arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after) - self._logger.info("Want %i more %s nodes. Booting a node.", - nodes_wanted, cloud_size.name) - new_setup = self._node_setup.start( - timer_actor=self._timer, - arvados_client=self._new_arvados(), - arvados_node=arvados_node, - cloud_client=self._new_cloud(), - cloud_size=cloud_size).proxy() - self.booting[new_setup.actor_ref.actor_urn] = new_setup - self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size - - if arvados_node is not None: - self.arvados_nodes[arvados_node['uuid']].assignment_time = ( - time.time()) - new_setup.subscribe(self._later.node_up) + + if not self.cancel_node_shutdown(cloud_size): + arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after) + self._logger.info("Want %i more %s nodes. Booting a node.", + nodes_wanted, cloud_size.id) + new_setup = self._node_setup.start( + timer_actor=self._timer, + arvados_client=self._new_arvados(), + arvados_node=arvados_node, + cloud_client=self._new_cloud(), + cloud_size=self.server_calculator.find_size(cloud_size.id)) + self.booting[new_setup.actor_urn] = new_setup.proxy() + self.sizes_booting[new_setup.actor_urn] = cloud_size + + if arvados_node is not None: + self.arvados_nodes[arvados_node['uuid']].assignment_time = ( + time.time()) + new_setup.tell_proxy().subscribe(self._later.node_setup_finished) + if nodes_wanted > 1: self._later.start_node(cloud_size) def _get_actor_attrs(self, actor, *attr_names): return pykka.get_all([getattr(actor, name) for name in attr_names]) - def node_up(self, setup_proxy): + def node_setup_finished(self, setup_proxy): # Called when a SetupActor has completed. - cloud_node, arvados_node = self._get_actor_attrs( - setup_proxy, 'cloud_node', 'arvados_node') + cloud_node, arvados_node, error = self._get_actor_attrs( + setup_proxy, 'cloud_node', 'arvados_node', 'error') setup_proxy.stop() - # If cloud_node is None then the node create wasn't - # successful and so there isn't anything to do. - if cloud_node is not None: + if cloud_node is None: + # If cloud_node is None then the node create wasn't successful. + if error == dispatch.QuotaExceeded: + # We've hit a quota limit, so adjust node_quota to stop trying to + # boot new nodes until the node count goes down. + self.node_quota = len(self.cloud_nodes) + self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota) + else: # Node creation succeeded. Update cloud node list. cloud_node._nodemanager_recently_booted = True self._register_cloud_node(cloud_node) + + # Different quota policies may in force depending on the cloud + # provider, account limits, and the specific mix of nodes sizes + # that are already created. If we are right at the quota limit, + # we want to probe to see if the last quota still applies or if we + # are allowed to create more nodes. + # + # For example, if the quota is actually based on core count, the + # quota might be 20 single-core machines or 10 dual-core machines. + # If we previously set node_quota to 10 dual core machines, but are + # now booting single core machines (actual quota 20), we want to + # allow the quota to expand so we don't get stuck at 10 machines + # forever. + if len(self.cloud_nodes) >= self.node_quota: + self.node_quota = len(self.cloud_nodes)+1 + self._logger.warning("After successful boot setting node quota to %s", self.node_quota) + + self.node_quota = min(self.node_quota, self.max_nodes) del self.booting[setup_proxy.actor_ref.actor_urn] del self.sizes_booting[setup_proxy.actor_ref.actor_urn] @@ -419,13 +462,28 @@ class NodeManagerDaemonActor(actor_class): if (nodes_excess < 1) or not self.booting: return None for key, node in self.booting.iteritems(): - if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get(): - del self.booting[key] - del self.sizes_booting[key] + try: + if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get(2): + del self.booting[key] + del self.sizes_booting[key] + if nodes_excess > 1: + self._later.stop_booting_node(size) + return + except pykka.Timeout: + pass - if nodes_excess > 1: - self._later.stop_booting_node(size) - break + @_check_poll_freshness + def cancel_node_shutdown(self, size): + # Go through shutdown actors and see if there are any of the appropriate size that can be cancelled + for record in self.cloud_nodes.nodes.itervalues(): + try: + if (record.shutdown_actor is not None and + record.cloud_node.size.id == size.id and + record.shutdown_actor.cancel_shutdown("Node size is in wishlist").get(2)): + return True + except (pykka.ActorDeadError, pykka.Timeout) as e: + pass + return False def _begin_node_shutdown(self, node_actor, cancellable): cloud_node_obj = node_actor.cloud_node.get() @@ -450,7 +508,7 @@ class NodeManagerDaemonActor(actor_class): # grace period without a ping, so shut it down so we can boot a new # node in its place. self._begin_node_shutdown(node_actor, cancellable=False) - elif node_actor.in_state('down').get(): + elif node_actor.in_state('down', 'fail').get(): # Node is down and unlikely to come back. self._begin_node_shutdown(node_actor, cancellable=False) except pykka.ActorDeadError as e: @@ -469,8 +527,19 @@ class NodeManagerDaemonActor(actor_class): except pykka.ActorDeadError: return cloud_node_id = cloud_node.id - record = self.cloud_nodes[cloud_node_id] - shutdown_actor.stop() + + try: + shutdown_actor.stop() + except pykka.ActorDeadError: + pass + + try: + record = self.cloud_nodes[cloud_node_id] + except KeyError: + # Cloud node was already removed from the cloud node list + # supposedly while the destroy_node call was finishing its + # job. + return record.shutdown_actor = None if not success: @@ -491,6 +560,16 @@ class NodeManagerDaemonActor(actor_class): def shutdown(self): self._logger.info("Shutting down after signal.") self.poll_stale_after = -1 # Inhibit starting/stopping nodes + + # Shut down pollers + self._server_wishlist_actor.stop() + self._arvados_nodes_actor.stop() + self._cloud_nodes_actor.stop() + + # Clear cloud node list + self.update_cloud_nodes([]) + + # Stop setup actors unless they are in the middle of setup. setup_stops = {key: node.stop_if_no_cloud_node() for key, node in self.booting.iteritems()} self.booting = {key: self.booting[key]