item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
def find_stale_node(self, stale_time):
- for record in self.nodes.itervalues():
+ # Try to select a stale node record that have an assigned slot first
+ for record in sorted(self.nodes.itervalues(),
+ key=lambda r: r.arvados_node['slot_number'],
+ reverse=True):
node = record.arvados_node
if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
stale_time) and
node_setup_class=dispatch.ComputeNodeSetupActor,
node_shutdown_class=dispatch.ComputeNodeShutdownActor,
node_actor_class=dispatch.ComputeNodeMonitorActor,
- max_total_price=0):
+ max_total_price=0,
+ consecutive_idle_count=1):
super(NodeManagerDaemonActor, self).__init__()
self._node_setup = node_setup_class
self._node_shutdown = node_shutdown_class
self.poll_stale_after = poll_stale_after
self.boot_fail_after = boot_fail_after
self.node_stale_after = node_stale_after
+ self.consecutive_idle_count = consecutive_idle_count
self.last_polls = {}
for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
poll_actor = locals()[poll_name + '_actor']
cloud_node=cloud_node,
cloud_node_start_time=start_time,
shutdown_timer=shutdown_timer,
- cloud_fqdn_func=self._cloud_driver.node_fqdn,
update_actor=self._cloud_updater,
timer_actor=self._timer,
arvados_node=None,
poll_stale_after=self.poll_stale_after,
node_stale_after=self.node_stale_after,
cloud_client=self._cloud_driver,
- boot_fail_after=self.boot_fail_after)
+ boot_fail_after=self.boot_fail_after,
+ consecutive_idle_count=self.consecutive_idle_count)
actorTell = actor.tell_proxy()
actorTell.subscribe(self._later.node_can_shutdown)
self._cloud_nodes_actor.subscribe_to(cloud_node.id,
if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
self.cloud_nodes.add(record)
else:
- # Node disappeared from the cloud node list. Stop the monitor
- # actor if necessary and forget about the node.
+ # Node disappeared from the cloud node list. If it's paired,
+ # remove its idle time counter.
+ if record.arvados_node:
+ status.tracker.idle_out(record.arvados_node.get('hostname'))
+ # Stop the monitor actor if necessary and forget about the node.
if record.actor:
try:
record.actor.stop()
updates.setdefault('nodes_'+s, 0)
updates['nodes_'+s] += 1
updates['nodes_wish'] = len(self.last_wishlist)
+ updates['node_quota'] = self.node_quota
status.tracker.update(updates)
def _state_counts(self, size):
"unpaired": 0,
"busy": 0,
"idle": 0,
+ "fail": 0,
"down": 0,
"shutdown": 0
}
busy_count = counts["busy"]
wishlist_count = self._size_wishlist(size)
- self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
+ self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.id,
wishlist_count,
up_count,
counts["booting"],
counts["unpaired"],
counts["idle"],
busy_count,
- counts["down"],
+ counts["down"]+counts["fail"],
counts["shutdown"])
if over_max >= 0:
can_boot = int((self.max_total_price - total_price) / size.price)
if can_boot == 0:
self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
- size.name, size.price, self.max_total_price, total_price)
+ size.id, size.price, self.max_total_price, total_price)
return can_boot
else:
return wanted
def update_server_wishlist(self, wishlist):
self._update_poll_time('server_wishlist')
- self.last_wishlist = wishlist
+ requestable_nodes = self.node_quota - (self._nodes_booting(None) + len(self.cloud_nodes))
+ self.last_wishlist = wishlist[:requestable_nodes]
for size in reversed(self.server_calculator.cloud_sizes):
try:
nodes_wanted = self._nodes_wanted(size)
self._later.start_node(size)
elif (nodes_wanted < 0) and self.booting:
self._later.stop_booting_node(size)
- except Exception as e:
+ except Exception:
self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
try:
self._update_tracker()
nodes_wanted = self._nodes_wanted(cloud_size)
if nodes_wanted < 1:
return None
- arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
- self._logger.info("Want %i more %s nodes. Booting a node.",
- nodes_wanted, cloud_size.name)
- new_setup = self._node_setup.start(
- timer_actor=self._timer,
- arvados_client=self._new_arvados(),
- arvados_node=arvados_node,
- cloud_client=self._new_cloud(),
- cloud_size=self.server_calculator.find_size(cloud_size.id)).proxy()
- self.booting[new_setup.actor_ref.actor_urn] = new_setup
- self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
-
- if arvados_node is not None:
- self.arvados_nodes[arvados_node['uuid']].assignment_time = (
- time.time())
- new_setup.subscribe(self._later.node_setup_finished)
+
+ if not self.cancel_node_shutdown(cloud_size):
+ arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
+ self._logger.info("Want %i more %s nodes. Booting a node.",
+ nodes_wanted, cloud_size.id)
+ new_setup = self._node_setup.start(
+ timer_actor=self._timer,
+ arvados_client=self._new_arvados(),
+ arvados_node=arvados_node,
+ cloud_client=self._new_cloud(),
+ cloud_size=self.server_calculator.find_size(cloud_size.id))
+ self.booting[new_setup.actor_urn] = new_setup.proxy()
+ self.sizes_booting[new_setup.actor_urn] = cloud_size
+
+ if arvados_node is not None:
+ self.arvados_nodes[arvados_node['uuid']].assignment_time = (
+ time.time())
+ new_setup.tell_proxy().subscribe(self._later.node_setup_finished)
+
if nodes_wanted > 1:
self._later.start_node(cloud_size)
if (nodes_excess < 1) or not self.booting:
return None
for key, node in self.booting.iteritems():
- if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
- del self.booting[key]
- del self.sizes_booting[key]
+ try:
+ if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get(2):
+ del self.booting[key]
+ del self.sizes_booting[key]
+ if nodes_excess > 1:
+ self._later.stop_booting_node(size)
+ return
+ except pykka.Timeout:
+ pass
- if nodes_excess > 1:
- self._later.stop_booting_node(size)
- break
+ @_check_poll_freshness
+ def cancel_node_shutdown(self, size):
+ # Go through shutdown actors and see if there are any of the appropriate size that can be cancelled
+ for record in self.cloud_nodes.nodes.itervalues():
+ try:
+ if (record.shutdown_actor is not None and
+ record.cloud_node.size.id == size.id and
+ record.shutdown_actor.cancel_shutdown("Node size is in wishlist").get(2)):
+ return True
+ except (pykka.ActorDeadError, pykka.Timeout) as e:
+ pass
+ return False
def _begin_node_shutdown(self, node_actor, cancellable):
cloud_node_obj = node_actor.cloud_node.get()
# grace period without a ping, so shut it down so we can boot a new
# node in its place.
self._begin_node_shutdown(node_actor, cancellable=False)
- elif node_actor.in_state('down').get():
+ elif node_actor.in_state('down', 'fail').get():
# Node is down and unlikely to come back.
self._begin_node_shutdown(node_actor, cancellable=False)
except pykka.ActorDeadError as e: