def __init__(self):
self.nodes = {}
self.orphans = {}
+ self._blacklist = set()
# Proxy the methods listed below to self.nodes.
def _proxy_method(name):
def add(self, record):
self.nodes[self.record_key(record)] = record
+ def blacklist(self, key):
+ self._blacklist.add(key)
+
def update_record(self, key, item):
setattr(self.nodes[key], self.RECORD_ATTR, item)
unseen = set(self.nodes.iterkeys())
for item in response:
key = self.item_key(item)
- if key in unseen:
+ if key in self._blacklist:
+ continue
+ elif key in unseen:
unseen.remove(key)
self.update_record(key, item)
else:
timer_actor=self._timer,
arvados_node=None,
poll_stale_after=self.poll_stale_after,
- node_stale_after=self.node_stale_after).proxy()
+ node_stale_after=self.node_stale_after,
+ cloud_client=self._cloud_driver,
+ boot_fail_after=self.boot_fail_after).proxy()
actor.subscribe(self._later.node_can_shutdown)
self._cloud_nodes_actor.subscribe_to(cloud_node.id,
actor.update_cloud_node)
self._pair_nodes(record, arv_rec.arvados_node)
break
for key, record in self.cloud_nodes.orphans.iteritems():
+ if key in self.shutdowns:
+ try:
+ self.shutdowns[key].stop().get()
+ except pykka.ActorDeadError:
+ pass
+ del self.shutdowns[key]
record.actor.stop()
record.cloud_node = None
- self.shutdowns.pop(key, None)
def update_arvados_nodes(self, nodelist):
self._update_poll_time('arvados_nodes')
self.cloud_nodes.nodes.itervalues())
if busy)
+ def _nodes_missing(self):
+ return sum(1 for arv_node in
+ pykka.get_all(rec.actor.arvados_node for rec in
+ self.cloud_nodes.nodes.itervalues()
+ if rec.actor.cloud_node.get().id not in self.shutdowns)
+ if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
+
def _nodes_wanted(self):
up_count = self._nodes_up()
under_min = self.min_nodes - up_count
elif under_min > 0:
return under_min
else:
- up_count -= len(self.shutdowns) + self._nodes_busy()
+ up_count -= len(self.shutdowns) + self._nodes_busy() + self._nodes_missing()
return len(self.last_wishlist) - up_count
def _nodes_excess(self):
return None
shutdown = self._node_shutdown.start(
timer_actor=self._timer, cloud_client=self._new_cloud(),
+ arvados_client=self._new_arvados(),
node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
self.shutdowns[cloud_node_id] = shutdown
shutdown.subscribe(self._later.node_finished_shutdown)
self._begin_node_shutdown(record.actor, cancellable=False)
def node_finished_shutdown(self, shutdown_actor):
- success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
- 'cloud_node')
+ cloud_node, success, cancel_reason = self._get_actor_attrs(
+ shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
shutdown_actor.stop()
cloud_node_id = cloud_node.id
if not success:
+ if cancel_reason == self._node_shutdown.NODE_BROKEN:
+ self.cloud_nodes.blacklist(cloud_node_id)
del self.shutdowns[cloud_node_id]
elif cloud_node_id in self.booted:
self.booted.pop(cloud_node_id).actor.stop()