X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d14d34b534e6914dea107878212436dd995c6560..d97388bdbfeb6a43cb86996012a1db0ba4a8871f:/services/nodemanager/arvnodeman/daemon.py diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py index dbc3273e1c..1edf4dc479 100644 --- a/services/nodemanager/arvnodeman/daemon.py +++ b/services/nodemanager/arvnodeman/daemon.py @@ -1,4 +1,7 @@ #!/usr/bin/env python +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 from __future__ import absolute_import, print_function @@ -9,6 +12,7 @@ import time import pykka from . import computenode as cnode +from . import status from .computenode import dispatch from .config import actor_class @@ -19,13 +23,12 @@ class _ComputeNodeRecord(object): self.cloud_node = cloud_node self.arvados_node = arvados_node self.assignment_time = assignment_time - + self.shutdown_actor = None class _BaseNodeTracker(object): def __init__(self): self.nodes = {} self.orphans = {} - self._blacklist = set() # Proxy the methods listed below to self.nodes. def _proxy_method(name): @@ -44,9 +47,6 @@ class _BaseNodeTracker(object): def add(self, record): self.nodes[self.record_key(record)] = record - def blacklist(self, key): - self._blacklist.add(key) - def update_record(self, key, item): setattr(self.nodes[key], self.RECORD_ATTR, item) @@ -54,9 +54,7 @@ class _BaseNodeTracker(object): unseen = set(self.nodes.iterkeys()) for item in response: key = self.item_key(item) - if key in self._blacklist: - continue - elif key in unseen: + if key in unseen: unseen.remove(key) self.update_record(key, item) else: @@ -80,7 +78,10 @@ class _ArvadosNodeTracker(_BaseNodeTracker): item_key = staticmethod(lambda arvados_node: arvados_node['uuid']) def find_stale_node(self, stale_time): - for record in self.nodes.itervalues(): + # Try to select a stale node record that have an assigned slot first + for record in sorted(self.nodes.itervalues(), + key=lambda r: r.arvados_node['slot_number'], + reverse=True): node = record.arvados_node if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node), stale_time) and @@ -111,7 +112,8 @@ class NodeManagerDaemonActor(actor_class): node_setup_class=dispatch.ComputeNodeSetupActor, node_shutdown_class=dispatch.ComputeNodeShutdownActor, node_actor_class=dispatch.ComputeNodeMonitorActor, - max_total_price=0): + max_total_price=0, + consecutive_idle_count=1): super(NodeManagerDaemonActor, self).__init__() self._node_setup = node_setup_class self._node_shutdown = node_shutdown_class @@ -121,17 +123,18 @@ class NodeManagerDaemonActor(actor_class): self._new_arvados = arvados_factory self._new_cloud = cloud_factory self._cloud_driver = self._new_cloud() - self._logger = logging.getLogger('arvnodeman.daemon') - self._later = self.actor_ref.proxy() + self._later = self.actor_ref.tell_proxy() self.shutdown_windows = shutdown_windows self.server_calculator = server_calculator self.min_cloud_size = self.server_calculator.cheapest_size() self.min_nodes = min_nodes self.max_nodes = max_nodes + self.node_quota = max_nodes self.max_total_price = max_total_price self.poll_stale_after = poll_stale_after self.boot_fail_after = boot_fail_after self.node_stale_after = node_stale_after + self.consecutive_idle_count = consecutive_idle_count self.last_polls = {} for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']: poll_actor = locals()[poll_name + '_actor'] @@ -141,16 +144,18 @@ class NodeManagerDaemonActor(actor_class): self.cloud_nodes = _CloudNodeTracker() self.arvados_nodes = _ArvadosNodeTracker() self.booting = {} # Actor IDs to ComputeNodeSetupActors - self.booted = {} # Cloud node IDs to _ComputeNodeRecords - self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors - self._logger.debug("Daemon initialized") + self.sizes_booting = {} # Actor IDs to node size + + def on_start(self): + self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:])) + self._logger.debug("Daemon started") def _update_poll_time(self, poll_key): self.last_polls[poll_key] = time.time() def _pair_nodes(self, node_record, arvados_node): - self._logger.info("Cloud node %s has associated with Arvados node %s", - node_record.cloud_node.id, arvados_node['uuid']) + self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s", + node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname']) self._arvados_nodes_actor.subscribe_to( arvados_node['uuid'], node_record.actor.update_arvados_node) node_record.arvados_node = arvados_node @@ -164,143 +169,207 @@ class NodeManagerDaemonActor(actor_class): cloud_node=cloud_node, cloud_node_start_time=start_time, shutdown_timer=shutdown_timer, - cloud_fqdn_func=self._cloud_driver.node_fqdn, update_actor=self._cloud_updater, timer_actor=self._timer, arvados_node=None, poll_stale_after=self.poll_stale_after, node_stale_after=self.node_stale_after, cloud_client=self._cloud_driver, - boot_fail_after=self.boot_fail_after).proxy() - actor.subscribe(self._later.node_can_shutdown) + boot_fail_after=self.boot_fail_after, + consecutive_idle_count=self.consecutive_idle_count) + actorTell = actor.tell_proxy() + actorTell.subscribe(self._later.node_can_shutdown) self._cloud_nodes_actor.subscribe_to(cloud_node.id, - actor.update_cloud_node) - record = _ComputeNodeRecord(actor, cloud_node) + actorTell.update_cloud_node) + record = _ComputeNodeRecord(actor.proxy(), cloud_node) return record + def _register_cloud_node(self, node): + rec = self.cloud_nodes.get(node.id) + if rec is None: + self._logger.info("Registering new cloud node %s", node.id) + record = self._new_node(node) + self.cloud_nodes.add(record) + else: + rec.cloud_node = node + def update_cloud_nodes(self, nodelist): self._update_poll_time('cloud_nodes') - for key, node in self.cloud_nodes.update_from(nodelist): - self._logger.info("Registering new cloud node %s", key) - if key in self.booted: - record = self.booted.pop(key) - else: - record = self._new_node(node) - self.cloud_nodes.add(record) - for arv_rec in self.arvados_nodes.unpaired(): - if record.actor.offer_arvados_pair(arv_rec.arvados_node).get(): - self._pair_nodes(record, arv_rec.arvados_node) - break - for key, record in self.cloud_nodes.orphans.iteritems(): - if key in self.shutdowns: + for _, node in self.cloud_nodes.update_from(nodelist): + self._register_cloud_node(node) + + self.try_pairing() + + for record in self.cloud_nodes.orphans.itervalues(): + if record.shutdown_actor: try: - self.shutdowns[key].stop().get() + record.shutdown_actor.stop() except pykka.ActorDeadError: pass - del self.shutdowns[key] - record.actor.stop() - record.cloud_node = None + record.shutdown_actor = None + + # A recently booted node is a node that successfully completed the + # setup actor but has not yet appeared in the cloud node list. + # This will have the tag _nodemanager_recently_booted on it, which + # means (if we're not shutting it down) we want to put it back into + # the cloud node list. Once it really appears in the cloud list, + # the object in record.cloud_node will be replaced by a new one + # that lacks the "_nodemanager_recently_booted" tag. + if hasattr(record.cloud_node, "_nodemanager_recently_booted"): + self.cloud_nodes.add(record) + else: + # Node disappeared from the cloud node list. If it's paired, + # remove its idle time counter. + if record.arvados_node: + status.tracker.idle_out(record.arvados_node.get('hostname')) + # Stop the monitor actor if necessary and forget about the node. + if record.actor: + try: + record.actor.stop() + except pykka.ActorDeadError: + pass + record.actor = None + record.cloud_node = None + + def _register_arvados_node(self, key, arv_node): + self._logger.info("Registering new Arvados node %s", key) + record = _ComputeNodeRecord(arvados_node=arv_node) + self.arvados_nodes.add(record) def update_arvados_nodes(self, nodelist): self._update_poll_time('arvados_nodes') for key, node in self.arvados_nodes.update_from(nodelist): - self._logger.info("Registering new Arvados node %s", key) - record = _ComputeNodeRecord(arvados_node=node) - self.arvados_nodes.add(record) - for arv_rec in self.arvados_nodes.unpaired(): - arv_node = arv_rec.arvados_node - for cloud_rec in self.cloud_nodes.unpaired(): - if cloud_rec.actor.offer_arvados_pair(arv_node).get(): - self._pair_nodes(cloud_rec, arv_node) + self._register_arvados_node(key, node) + self.try_pairing() + + def try_pairing(self): + for record in self.cloud_nodes.unpaired(): + for arv_rec in self.arvados_nodes.unpaired(): + if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get(): + self._pair_nodes(record, arv_rec.arvados_node) break - def _nodes_up(self, size): - up = 0 - up += sum(1 - for c in self.booting.itervalues() - if size is None or c.cloud_size.get().id == size.id) - up += sum(1 - for i in (self.booted, self.cloud_nodes.nodes) - for c in i.itervalues() - if size is None or c.cloud_node.size.id == size.id) + def _nodes_booting(self, size): + s = sum(1 + for c in self.booting.iterkeys() + if size is None or self.sizes_booting[c].id == size.id) + return s + + def _node_states(self, size): + proxy_states = [] + states = [] + for rec in self.cloud_nodes.nodes.itervalues(): + if size is None or rec.cloud_node.size.id == size.id: + if rec.shutdown_actor is None and rec.actor is not None: + proxy_states.append(rec.actor.get_state()) + else: + states.append("shutdown") + return states + pykka.get_all(proxy_states) + + def _update_tracker(self): + updates = { + k: 0 + for k in status.tracker.keys() + if k.startswith('nodes_') + } + for s in self._node_states(size=None): + updates.setdefault('nodes_'+s, 0) + updates['nodes_'+s] += 1 + updates['nodes_wish'] = len(self.last_wishlist) + updates['node_quota'] = self.node_quota + status.tracker.update(updates) + + def _state_counts(self, size): + states = self._node_states(size) + counts = { + "booting": self._nodes_booting(size), + "unpaired": 0, + "busy": 0, + "idle": 0, + "fail": 0, + "down": 0, + "shutdown": 0 + } + for s in states: + counts[s] = counts[s] + 1 + return counts + + def _nodes_up(self, counts): + up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"] return up def _total_price(self): cost = 0 - cost += sum(self.server_calculator.find_size(c.cloud_size.get().id).price - for c in self.booting.itervalues()) - cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price - for i in (self.booted, self.cloud_nodes.nodes) - for c in i.itervalues()) + cost += sum(self.sizes_booting[c].price + for c in self.booting.iterkeys()) + cost += sum(c.cloud_node.size.price + for c in self.cloud_nodes.nodes.itervalues()) return cost - def _nodes_busy(self, size): - return sum(1 for busy in - pykka.get_all(rec.actor.in_state('busy') for rec in - self.cloud_nodes.nodes.itervalues() - if rec.cloud_node.size.id == size.id) - if busy) - - def _nodes_missing(self, size): - return sum(1 for arv_node in - pykka.get_all(rec.actor.arvados_node for rec in - self.cloud_nodes.nodes.itervalues() - if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns) - if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after)) - def _size_wishlist(self, size): return sum(1 for c in self.last_wishlist if c.id == size.id) - def _size_shutdowns(self, size): - sh = 0 - for c in self.shutdowns.itervalues(): - try: - if c.cloud_node.get().size.id == size.id: - sh += 1 - except pykka.ActorDeadError: - pass - return sh - def _nodes_wanted(self, size): - total_up_count = self._nodes_up(None) - under_min = self.min_nodes - total_up_count - over_max = total_up_count - self.max_nodes + total_node_count = self._nodes_booting(None) + len(self.cloud_nodes) + under_min = self.min_nodes - total_node_count + over_max = total_node_count - self.node_quota total_price = self._total_price() + counts = self._state_counts(size) + + up_count = self._nodes_up(counts) + busy_count = counts["busy"] + wishlist_count = self._size_wishlist(size) + + self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.id, + wishlist_count, + up_count, + counts["booting"], + counts["unpaired"], + counts["idle"], + busy_count, + counts["down"]+counts["fail"], + counts["shutdown"]) + if over_max >= 0: return -over_max elif under_min > 0 and size.id == self.min_cloud_size.id: return under_min - up_count = self._nodes_up(size) - (self._size_shutdowns(size) + - self._nodes_busy(size) + - self._nodes_missing(size)) - - wanted = self._size_wishlist(size) - up_count + wanted = wishlist_count - (up_count - busy_count) if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price): can_boot = int((self.max_total_price - total_price) / size.price) if can_boot == 0: self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)", - size.name, size.price, self.max_total_price, total_price) + size.id, size.price, self.max_total_price, total_price) return can_boot else: return wanted def _nodes_excess(self, size): - up_count = self._nodes_up(size) - self._size_shutdowns(size) + counts = self._state_counts(size) + up_count = self._nodes_up(counts) if size.id == self.min_cloud_size.id: up_count -= self.min_nodes - return up_count - self._nodes_busy(size) - self._size_wishlist(size) + return up_count - (counts["busy"] + self._size_wishlist(size)) def update_server_wishlist(self, wishlist): self._update_poll_time('server_wishlist') - self.last_wishlist = wishlist + requestable_nodes = self.node_quota - (self._nodes_booting(None) + len(self.cloud_nodes)) + self.last_wishlist = wishlist[:requestable_nodes] for size in reversed(self.server_calculator.cloud_sizes): - nodes_wanted = self._nodes_wanted(size) - if nodes_wanted > 0: - self._later.start_node(size) - elif (nodes_wanted < 0) and self.booting: - self._later.stop_booting_node(size) + try: + nodes_wanted = self._nodes_wanted(size) + if nodes_wanted > 0: + self._later.start_node(size) + elif (nodes_wanted < 0) and self.booting: + self._later.stop_booting_node(size) + except Exception: + self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)")) + try: + self._update_tracker() + except: + self._logger.exception("while updating tracker") def _check_poll_freshness(orig_func): """Decorator to inhibit a method when poll information is stale. @@ -324,36 +393,68 @@ class NodeManagerDaemonActor(actor_class): nodes_wanted = self._nodes_wanted(cloud_size) if nodes_wanted < 1: return None - arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after) - self._logger.info("Want %s more nodes. Booting a %s node.", - nodes_wanted, cloud_size.name) - new_setup = self._node_setup.start( - timer_actor=self._timer, - arvados_client=self._new_arvados(), - arvados_node=arvados_node, - cloud_client=self._new_cloud(), - cloud_size=cloud_size).proxy() - self.booting[new_setup.actor_ref.actor_urn] = new_setup - if arvados_node is not None: - self.arvados_nodes[arvados_node['uuid']].assignment_time = ( - time.time()) - new_setup.subscribe(self._later.node_up) + + if not self.cancel_node_shutdown(cloud_size): + arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after) + self._logger.info("Want %i more %s nodes. Booting a node.", + nodes_wanted, cloud_size.id) + new_setup = self._node_setup.start( + timer_actor=self._timer, + arvados_client=self._new_arvados(), + arvados_node=arvados_node, + cloud_client=self._new_cloud(), + cloud_size=self.server_calculator.find_size(cloud_size.id)) + self.booting[new_setup.actor_urn] = new_setup.proxy() + self.sizes_booting[new_setup.actor_urn] = cloud_size + + if arvados_node is not None: + self.arvados_nodes[arvados_node['uuid']].assignment_time = ( + time.time()) + new_setup.tell_proxy().subscribe(self._later.node_setup_finished) + if nodes_wanted > 1: self._later.start_node(cloud_size) def _get_actor_attrs(self, actor, *attr_names): return pykka.get_all([getattr(actor, name) for name in attr_names]) - def node_up(self, setup_proxy): - cloud_node = setup_proxy.cloud_node.get() - del self.booting[setup_proxy.actor_ref.actor_urn] + def node_setup_finished(self, setup_proxy): + # Called when a SetupActor has completed. + cloud_node, arvados_node, error = self._get_actor_attrs( + setup_proxy, 'cloud_node', 'arvados_node', 'error') setup_proxy.stop() - record = self.cloud_nodes.get(cloud_node.id) - if record is None: - record = self._new_node(cloud_node) - self.booted[cloud_node.id] = record - self._timer.schedule(time.time() + self.boot_fail_after, - self._later.shutdown_unpaired_node, cloud_node.id) + + if cloud_node is None: + # If cloud_node is None then the node create wasn't successful. + if error == dispatch.QuotaExceeded: + # We've hit a quota limit, so adjust node_quota to stop trying to + # boot new nodes until the node count goes down. + self.node_quota = len(self.cloud_nodes) + self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota) + else: + # Node creation succeeded. Update cloud node list. + cloud_node._nodemanager_recently_booted = True + self._register_cloud_node(cloud_node) + + # Different quota policies may in force depending on the cloud + # provider, account limits, and the specific mix of nodes sizes + # that are already created. If we are right at the quota limit, + # we want to probe to see if the last quota still applies or if we + # are allowed to create more nodes. + # + # For example, if the quota is actually based on core count, the + # quota might be 20 single-core machines or 10 dual-core machines. + # If we previously set node_quota to 10 dual core machines, but are + # now booting single core machines (actual quota 20), we want to + # allow the quota to expand so we don't get stuck at 10 machines + # forever. + if len(self.cloud_nodes) >= self.node_quota: + self.node_quota = len(self.cloud_nodes)+1 + self._logger.warning("After successful boot setting node quota to %s", self.node_quota) + + self.node_quota = min(self.node_quota, self.max_nodes) + del self.booting[setup_proxy.actor_ref.actor_urn] + del self.sizes_booting[setup_proxy.actor_ref.actor_urn] @_check_poll_freshness def stop_booting_node(self, size): @@ -361,54 +462,114 @@ class NodeManagerDaemonActor(actor_class): if (nodes_excess < 1) or not self.booting: return None for key, node in self.booting.iteritems(): - if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get(): - del self.booting[key] - if nodes_excess > 1: - self._later.stop_booting_node(size) - break + try: + if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get(2): + del self.booting[key] + del self.sizes_booting[key] + if nodes_excess > 1: + self._later.stop_booting_node(size) + return + except pykka.Timeout: + pass + + @_check_poll_freshness + def cancel_node_shutdown(self, size): + # Go through shutdown actors and see if there are any of the appropriate size that can be cancelled + for record in self.cloud_nodes.nodes.itervalues(): + try: + if (record.shutdown_actor is not None and + record.cloud_node.size.id == size.id and + record.shutdown_actor.cancel_shutdown("Node size is in wishlist").get(2)): + return True + except (pykka.ActorDeadError, pykka.Timeout) as e: + pass + return False def _begin_node_shutdown(self, node_actor, cancellable): - cloud_node_id = node_actor.cloud_node.get().id - if cloud_node_id in self.shutdowns: + cloud_node_obj = node_actor.cloud_node.get() + cloud_node_id = cloud_node_obj.id + record = self.cloud_nodes[cloud_node_id] + if record.shutdown_actor is not None: return None shutdown = self._node_shutdown.start( timer_actor=self._timer, cloud_client=self._new_cloud(), arvados_client=self._new_arvados(), - node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy() - self.shutdowns[cloud_node_id] = shutdown - shutdown.subscribe(self._later.node_finished_shutdown) + node_monitor=node_actor.actor_ref, cancellable=cancellable) + record.shutdown_actor = shutdown.proxy() + shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown) @_check_poll_freshness def node_can_shutdown(self, node_actor): - if self._nodes_excess(node_actor.cloud_node.get().size) > 0: - self._begin_node_shutdown(node_actor, cancellable=True) - - def shutdown_unpaired_node(self, cloud_node_id): - for record_dict in [self.cloud_nodes, self.booted]: - if cloud_node_id in record_dict: - record = record_dict[cloud_node_id] - break - else: - return None - if not record.actor.in_state('idle', 'busy').get(): - self._begin_node_shutdown(record.actor, cancellable=False) + try: + if self._nodes_excess(node_actor.cloud_node.get().size) > 0: + self._begin_node_shutdown(node_actor, cancellable=True) + elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None: + # Node is unpaired, which means it probably exceeded its booting + # grace period without a ping, so shut it down so we can boot a new + # node in its place. + self._begin_node_shutdown(node_actor, cancellable=False) + elif node_actor.in_state('down', 'fail').get(): + # Node is down and unlikely to come back. + self._begin_node_shutdown(node_actor, cancellable=False) + except pykka.ActorDeadError as e: + # The monitor actor sends shutdown suggestions every time the + # node's state is updated, and these go into the daemon actor's + # message queue. It's possible that the node has already been shut + # down (which shuts down the node monitor actor). In that case, + # this message is stale and we'll get ActorDeadError when we try to + # access node_actor. Log the error. + self._logger.debug("ActorDeadError in node_can_shutdown: %s", e) def node_finished_shutdown(self, shutdown_actor): - cloud_node, success, cancel_reason = self._get_actor_attrs( - shutdown_actor, 'cloud_node', 'success', 'cancel_reason') - shutdown_actor.stop() + try: + cloud_node, success = self._get_actor_attrs( + shutdown_actor, 'cloud_node', 'success') + except pykka.ActorDeadError: + return cloud_node_id = cloud_node.id + + try: + shutdown_actor.stop() + except pykka.ActorDeadError: + pass + + try: + record = self.cloud_nodes[cloud_node_id] + except KeyError: + # Cloud node was already removed from the cloud node list + # supposedly while the destroy_node call was finishing its + # job. + return + record.shutdown_actor = None + if not success: - if cancel_reason == self._node_shutdown.NODE_BROKEN: - self.cloud_nodes.blacklist(cloud_node_id) - del self.shutdowns[cloud_node_id] - elif cloud_node_id in self.booted: - self.booted.pop(cloud_node_id).actor.stop() - del self.shutdowns[cloud_node_id] + return + + # Shutdown was successful, so stop the monitor actor, otherwise it + # will keep offering the node as a candidate for shutdown. + record.actor.stop() + record.actor = None + + # If the node went from being booted to being shut down without ever + # appearing in the cloud node list, it will have the + # _nodemanager_recently_booted tag, so get rid of it so that the node + # can be forgotten completely. + if hasattr(record.cloud_node, "_nodemanager_recently_booted"): + del record.cloud_node._nodemanager_recently_booted def shutdown(self): self._logger.info("Shutting down after signal.") self.poll_stale_after = -1 # Inhibit starting/stopping nodes + + # Shut down pollers + self._server_wishlist_actor.stop() + self._arvados_nodes_actor.stop() + self._cloud_nodes_actor.stop() + + # Clear cloud node list + self.update_cloud_nodes([]) + + # Stop setup actors unless they are in the middle of setup. setup_stops = {key: node.stop_if_no_cloud_node() for key, node in self.booting.iteritems()} self.booting = {key: self.booting[key]