-
-
-class ComputeNodeMonitorActor(config.actor_class):
- """Actor to manage a running compute node.
-
- This actor gets updates about a compute node's cloud and Arvados records.
- It uses this information to notify subscribers when the node is eligible
- for shutdown.
- """
- def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
- timer_actor, update_actor, arvados_node=None,
- poll_stale_after=600, node_stale_after=3600):
- super(ComputeNodeMonitorActor, self).__init__()
- self._later = self.actor_ref.proxy()
- self._logger = logging.getLogger('arvnodeman.computenode')
- self._last_log = None
- self._shutdowns = shutdown_timer
- self._timer = timer_actor
- self._update = update_actor
- self.cloud_node = cloud_node
- self.cloud_node_start_time = cloud_node_start_time
- self.poll_stale_after = poll_stale_after
- self.node_stale_after = node_stale_after
- self.subscribers = set()
- self.arvados_node = None
- self._later.update_arvados_node(arvados_node)
- self.last_shutdown_opening = None
- self._later.consider_shutdown()
-
- def subscribe(self, subscriber):
- self.subscribers.add(subscriber)
-
- def _debug(self, msg, *args):
- if msg == self._last_log:
- return
- self._last_log = msg
- self._logger.debug(msg, *args)
-
- def _shutdown_eligible(self):
- if self.arvados_node is None:
- return timestamp_fresh(self.cloud_node_start_time,
- self.node_stale_after)
- else:
- return (timestamp_fresh(arvados_node_mtime(self.arvados_node),
- self.poll_stale_after) and
- (self.arvados_node['info'].get('slurm_state') == 'idle'))
-
- def consider_shutdown(self):
- next_opening = self._shutdowns.next_opening()
- if self._shutdowns.window_open():
- if self._shutdown_eligible():
- self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
- _notify_subscribers(self._later, self.subscribers)
- else:
- self._debug("Node %s shutdown window open but node busy.",
- self.cloud_node.id)
- else:
- self._debug("Node %s shutdown window closed. Next at %s.",
- self.cloud_node.id, time.ctime(next_opening))
- if self.last_shutdown_opening != next_opening:
- self._timer.schedule(next_opening, self._later.consider_shutdown)
- self.last_shutdown_opening = next_opening
-
- def offer_arvados_pair(self, arvados_node):
- if self.arvados_node is not None:
- return None
- elif arvados_node['ip_address'] in self.cloud_node.private_ips:
- self._later.update_arvados_node(arvados_node)
- return self.cloud_node.id
- else:
- return None
-
- def update_cloud_node(self, cloud_node):
- if cloud_node is not None:
- self.cloud_node = cloud_node
- self._later.consider_shutdown()
-
- def update_arvados_node(self, arvados_node):
- if arvados_node is not None:
- self.arvados_node = arvados_node
- new_hostname = arvados_node_fqdn(self.arvados_node)
- if new_hostname != self.cloud_node.name:
- self._update.sync_node(self.cloud_node, self.arvados_node)
- self._later.consider_shutdown()