def __init__(self):
self.nodes = {}
self.orphans = {}
- self._blacklist = set()
# Proxy the methods listed below to self.nodes.
def _proxy_method(name):
def add(self, record):
self.nodes[self.record_key(record)] = record
- def blacklist(self, key):
- self._blacklist.add(key)
-
def update_record(self, key, item):
setattr(self.nodes[key], self.RECORD_ATTR, item)
unseen = set(self.nodes.iterkeys())
for item in response:
key = self.item_key(item)
- if key in self._blacklist:
- continue
- elif key in unseen:
+ if key in unseen:
unseen.remove(key)
self.update_record(key, item)
else:
return (record for record in self.nodes.itervalues()
if getattr(record, self.PAIR_ATTR) is None)
- def paired(self):
- return (record for record in self.nodes.itervalues()
- if getattr(record, self.PAIR_ATTR) is not None)
-
class _CloudNodeTracker(_BaseNodeTracker):
RECORD_ATTR = 'cloud_node'
if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
self.cloud_nodes.add(record)
else:
- record.actor.stop()
+ # Node disappeared from the cloud node list. Stop the monitor
+ # actor if necessary and forget about the node.
+ if record.actor:
+ try:
+ record.actor.stop()
+ except pykka.ActorDeadError:
+ pass
+ record.actor = None
record.cloud_node = None
def _register_arvados_node(self, key, arv_node):
def try_pairing(self):
for record in self.cloud_nodes.unpaired():
for arv_rec in self.arvados_nodes.unpaired():
- if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
+ if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
self._pair_nodes(record, arv_rec.arvados_node)
break
return s
def _node_states(self, size):
- states = pykka.get_all(rec.actor.get_state()
- for rec in self.cloud_nodes.nodes.itervalues()
- if ((size is None or rec.cloud_node.size.id == size.id) and
- rec.shutdown_actor is None))
- states += ['shutdown' for rec in self.cloud_nodes.nodes.itervalues()
- if ((size is None or rec.cloud_node.size.id == size.id) and
- rec.shutdown_actor is not None)]
- return states
+ proxy_states = []
+ states = []
+ for rec in self.cloud_nodes.nodes.itervalues():
+ if size is None or rec.cloud_node.size.id == size.id:
+ if rec.shutdown_actor is None and rec.actor is not None:
+ proxy_states.append(rec.actor.get_state())
+ else:
+ states.append("shutdown")
+ return states + pykka.get_all(proxy_states)
def _state_counts(self, size):
states = self._node_states(size)
def _total_price(self):
cost = 0
- cost += sum(self.server_calculator.find_size(self.sizes_booting[c].id).price
- for c in self.booting.iterkeys())
- cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
+ cost += sum(self.sizes_booting[c].price
+ for c in self.booting.iterkeys())
+ cost += sum(c.cloud_node.size.price
for c in self.cloud_nodes.nodes.itervalues())
return cost
@_check_poll_freshness
def node_can_shutdown(self, node_actor):
- if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
- print("excess")
- self._begin_node_shutdown(node_actor, cancellable=True)
- elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
- # Node is unpaired, which means it probably exceeded its booting
- # grace period without a ping, so shut it down so we can boot a new
- # node in its place.
- print("unpaired")
- self._begin_node_shutdown(node_actor, cancellable=False)
- elif node_actor.in_state('down').get():
- # Node is down and unlikely to come back.
- self._begin_node_shutdown(node_actor, cancellable=False)
+ try:
+ if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
+ self._begin_node_shutdown(node_actor, cancellable=True)
+ elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
+ # Node is unpaired, which means it probably exceeded its booting
+ # grace period without a ping, so shut it down so we can boot a new
+ # node in its place.
+ self._begin_node_shutdown(node_actor, cancellable=False)
+ elif node_actor.in_state('down').get():
+ # Node is down and unlikely to come back.
+ self._begin_node_shutdown(node_actor, cancellable=False)
+ except pykka.ActorDeadError as e:
+ # The monitor actor sends shutdown suggestions every time the
+ # node's state is updated, and these go into the daemon actor's
+ # message queue. It's possible that the node has already been shut
+ # down (which shuts down the node monitor actor). In that case,
+ # this message is stale and we'll get ActorDeadError when we try to
+ # access node_actor. Log the error.
+ self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
def node_finished_shutdown(self, shutdown_actor):
- cloud_node, success, cancel_reason = self._get_actor_attrs(
- shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
+ try:
+ cloud_node, success = self._get_actor_attrs(
+ shutdown_actor, 'cloud_node', 'success')
+ except pykka.ActorDeadError:
+ return
cloud_node_id = cloud_node.id
record = self.cloud_nodes[cloud_node_id]
shutdown_actor.stop()
+ record.shutdown_actor = None
+
if not success:
- if cancel_reason == self._node_shutdown.NODE_BROKEN:
- self.cloud_nodes.blacklist(cloud_node_id)
- record.shutdown_actor = None
- else:
- # If the node went from being booted to being shut down without ever
- # appearing in the cloud node list, it will have the
- # _nodemanager_recently_booted tag, so get rid of it so that the node
- # can be forgotten completely.
- if hasattr(self.cloud_nodes[cloud_node_id].cloud_node, "_nodemanager_recently_booted"):
- del self.cloud_nodes[cloud_node_id].cloud_node._nodemanager_recently_booted
+ return
+
+ # Shutdown was successful, so stop the monitor actor, otherwise it
+ # will keep offering the node as a candidate for shutdown.
+ record.actor.stop()
+ record.actor = None
+
+ # If the node went from being booted to being shut down without ever
+ # appearing in the cloud node list, it will have the
+ # _nodemanager_recently_booted tag, so get rid of it so that the node
+ # can be forgotten completely.
+ if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
+ del record.cloud_node._nodemanager_recently_booted
def shutdown(self):
self._logger.info("Shutting down after signal.")