X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e4c7e6b368cf6d922db341580a2402a07c6cb079..9fdfd5c5b229ea42193710f891e953b452bd90e7:/services/nodemanager/arvnodeman/daemon.py diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py index 8f9207e3ba..ca3029d9e1 100644 --- a/services/nodemanager/arvnodeman/daemon.py +++ b/services/nodemanager/arvnodeman/daemon.py @@ -1,4 +1,7 @@ #!/usr/bin/env python +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 from __future__ import absolute_import, print_function @@ -75,7 +78,10 @@ class _ArvadosNodeTracker(_BaseNodeTracker): item_key = staticmethod(lambda arvados_node: arvados_node['uuid']) def find_stale_node(self, stale_time): - for record in self.nodes.itervalues(): + # Try to select a stale node record that have an assigned slot first + for record in sorted(self.nodes.itervalues(), + key=lambda r: r.arvados_node['slot_number'], + reverse=True): node = record.arvados_node if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node), stale_time) and @@ -146,8 +152,8 @@ class NodeManagerDaemonActor(actor_class): self.last_polls[poll_key] = time.time() def _pair_nodes(self, node_record, arvados_node): - self._logger.info("Cloud node %s is now paired with Arvados node %s", - node_record.cloud_node.name, arvados_node['uuid']) + self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s", + node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname']) self._arvados_nodes_actor.subscribe_to( arvados_node['uuid'], node_record.actor.update_arvados_node) node_record.arvados_node = arvados_node @@ -387,7 +393,7 @@ class NodeManagerDaemonActor(actor_class): arvados_client=self._new_arvados(), arvados_node=arvados_node, cloud_client=self._new_cloud(), - cloud_size=cloud_size).proxy() + cloud_size=self.server_calculator.find_size(cloud_size.id)).proxy() self.booting[new_setup.actor_ref.actor_urn] = new_setup self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size @@ -407,14 +413,13 @@ class NodeManagerDaemonActor(actor_class): setup_proxy, 'cloud_node', 'arvados_node', 'error') setup_proxy.stop() - total_node_count = self._nodes_booting(None) + len(self.cloud_nodes) if cloud_node is None: # If cloud_node is None then the node create wasn't successful. if error == dispatch.QuotaExceeded: # We've hit a quota limit, so adjust node_quota to stop trying to # boot new nodes until the node count goes down. - self.node_quota = min(total_node_count-1, self.max_nodes) - self._logger.warning("Setting node quota to %s", self.node_quota) + self.node_quota = len(self.cloud_nodes) + self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota) else: # Node creation succeeded. Update cloud node list. cloud_node._nodemanager_recently_booted = True @@ -432,8 +437,11 @@ class NodeManagerDaemonActor(actor_class): # now booting single core machines (actual quota 20), we want to # allow the quota to expand so we don't get stuck at 10 machines # forever. - if total_node_count == self.node_quota and self.node_quota < self.max_nodes: - self.node_quota += 1 + if len(self.cloud_nodes) >= self.node_quota: + self.node_quota = len(self.cloud_nodes)+1 + self._logger.warning("After successful boot setting node quota to %s", self.node_quota) + + self.node_quota = min(self.node_quota, self.max_nodes) del self.booting[setup_proxy.actor_ref.actor_urn] del self.sizes_booting[setup_proxy.actor_ref.actor_urn] @@ -493,8 +501,19 @@ class NodeManagerDaemonActor(actor_class): except pykka.ActorDeadError: return cloud_node_id = cloud_node.id - record = self.cloud_nodes[cloud_node_id] - shutdown_actor.stop() + + try: + shutdown_actor.stop() + except pykka.ActorDeadError: + pass + + try: + record = self.cloud_nodes[cloud_node_id] + except KeyError: + # Cloud node was already removed from the cloud node list + # supposedly while the destroy_node call was finishing its + # job. + return record.shutdown_actor = None if not success: @@ -515,6 +534,16 @@ class NodeManagerDaemonActor(actor_class): def shutdown(self): self._logger.info("Shutting down after signal.") self.poll_stale_after = -1 # Inhibit starting/stopping nodes + + # Shut down pollers + self._server_wishlist_actor.stop() + self._arvados_nodes_actor.stop() + self._cloud_nodes_actor.stop() + + # Clear cloud node list + self.update_cloud_nodes([]) + + # Stop setup actors unless they are in the middle of setup. setup_stops = {key: node.stop_if_no_cloud_node() for key, node in self.booting.iteritems()} self.booting = {key: self.booting[key]