super(ComputeNodeStateChangeBase, self).__init__()
RetryMixin.__init__(self, retry_wait, max_retry_wait,
None, cloud_client, timer_actor)
- self._later = self.actor_ref.proxy()
+ self._later = self.actor_ref.tell_proxy()
self._arvados = arvados_client
self.subscribers = set()
self._set_logger()
def _finished(self):
- _notify_subscribers(self._later, self.subscribers)
+ if self.subscribers is None:
+ raise Exception("Actor tried to finish twice")
+ _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
self.subscribers = None
self._logger.info("finished")
def subscribe(self, subscriber):
if self.subscribers is None:
try:
- subscriber(self._later)
+ subscriber(self.actor_ref.proxy())
except pykka.ActorDeadError:
pass
else:
if not self._cloud.destroy_node(self.cloud_node):
if self._cloud.broken(self.cloud_node):
self._later.cancel_shutdown(self.NODE_BROKEN)
+ return
else:
# Force a retry.
raise cloud_types.LibcloudError("destroy_node failed")
this to perform maintenance tasks on themselves. Having a
dedicated actor for this gives us the opportunity to control the
flow of requests; e.g., by backing off when errors occur.
-
- This actor is most like a "traditional" Pykka actor: there's no
- subscribing, but instead methods return real driver results. If
- you're interested in those results, you should get them from the
- Future that the proxy method returns. Be prepared to handle exceptions
- from the cloud driver when you do.
"""
def __init__(self, cloud_factory, max_retry_wait=180):
super(ComputeNodeUpdateActor, self).__init__()
self.error_streak = 0
self.next_request_time = time.time()
+ def _set_logger(self):
+ self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+
+ def on_start(self):
+ self._set_logger()
+
def _throttle_errors(orig_func):
@functools.wraps(orig_func)
def throttle_wrapper(self, *args, **kwargs):
try:
result = orig_func(self, *args, **kwargs)
except Exception as error:
- self.error_streak += 1
- self.next_request_time += min(2 ** self.error_streak,
- self.max_retry_wait)
- raise
+ if self._cloud.is_cloud_exception(error):
+ self.error_streak += 1
+ self.next_request_time += min(2 ** self.error_streak,
+ self.max_retry_wait)
+ self._logger.warn(
+ "Unhandled exception: %s", error, exc_info=error)
else:
self.error_streak = 0
return result
boot_fail_after=1800
):
super(ComputeNodeMonitorActor, self).__init__()
- self._later = self.actor_ref.proxy()
+ self._later = self.actor_ref.tell_proxy()
self._last_log = None
self._shutdowns = shutdown_timer
self._cloud_node_fqdn = cloud_fqdn_func
eligible = self.shutdown_eligible()
if eligible is True:
self._debug("Suggesting shutdown.")
- _notify_subscribers(self._later, self.subscribers)
+ _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
elif self._shutdowns.window_open():
self._debug("Cannot shut down because %s", eligible)
elif self.last_shutdown_opening != next_opening:
first_ping_s = arvados_node.get('first_ping_at')
if (self.arvados_node is not None) or (not first_ping_s):
return None
- elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
+ elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
(arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
self._later.update_arvados_node(arvados_node)
return self.cloud_node.id