8816: ComputeNodeUpdateActor._throttle_errors logs errors instead of re-throwing...
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
index 2ae4fc8923612d474b833fcf9f345b255148ee3d..42960efeba4d758471710ece25919d4b7a915df2 100644 (file)
@@ -26,7 +26,7 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
         super(ComputeNodeStateChangeBase, self).__init__()
         RetryMixin.__init__(self, retry_wait, max_retry_wait,
                             None, cloud_client, timer_actor)
-        self._later = self.actor_ref.proxy()
+        self._later = self.actor_ref.tell_proxy()
         self._arvados = arvados_client
         self.subscribers = set()
 
@@ -37,14 +37,16 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
         self._set_logger()
 
     def _finished(self):
-        _notify_subscribers(self._later, self.subscribers)
+        if self.subscribers is None:
+            raise Exception("Actor tried to finish twice")
+        _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
         self.subscribers = None
         self._logger.info("finished")
 
     def subscribe(self, subscriber):
         if self.subscribers is None:
             try:
-                subscriber(self._later)
+                subscriber(self.actor_ref.proxy())
             except pykka.ActorDeadError:
                 pass
         else:
@@ -225,6 +227,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         if not self._cloud.destroy_node(self.cloud_node):
             if self._cloud.broken(self.cloud_node):
                 self._later.cancel_shutdown(self.NODE_BROKEN)
+                return
             else:
                 # Force a retry.
                 raise cloud_types.LibcloudError("destroy_node failed")
@@ -253,12 +256,6 @@ class ComputeNodeUpdateActor(config.actor_class):
     this to perform maintenance tasks on themselves.  Having a
     dedicated actor for this gives us the opportunity to control the
     flow of requests; e.g., by backing off when errors occur.
-
-    This actor is most like a "traditional" Pykka actor: there's no
-    subscribing, but instead methods return real driver results.  If
-    you're interested in those results, you should get them from the
-    Future that the proxy method returns.  Be prepared to handle exceptions
-    from the cloud driver when you do.
     """
     def __init__(self, cloud_factory, max_retry_wait=180):
         super(ComputeNodeUpdateActor, self).__init__()
@@ -267,6 +264,12 @@ class ComputeNodeUpdateActor(config.actor_class):
         self.error_streak = 0
         self.next_request_time = time.time()
 
+    def _set_logger(self):
+        self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+
+    def on_start(self):
+        self._set_logger()
+
     def _throttle_errors(orig_func):
         @functools.wraps(orig_func)
         def throttle_wrapper(self, *args, **kwargs):
@@ -280,7 +283,9 @@ class ComputeNodeUpdateActor(config.actor_class):
                 self.error_streak += 1
                 self.next_request_time += min(2 ** self.error_streak,
                                               self.max_retry_wait)
-                raise
+                self._logger.error(
+                    "Caught unknown error (no retry): %s",
+                    error, exc_info=error)
             else:
                 self.error_streak = 0
                 return result
@@ -304,7 +309,7 @@ class ComputeNodeMonitorActor(config.actor_class):
                  boot_fail_after=1800
     ):
         super(ComputeNodeMonitorActor, self).__init__()
-        self._later = self.actor_ref.proxy()
+        self._later = self.actor_ref.tell_proxy()
         self._last_log = None
         self._shutdowns = shutdown_timer
         self._cloud_node_fqdn = cloud_fqdn_func
@@ -388,7 +393,7 @@ class ComputeNodeMonitorActor(config.actor_class):
             eligible = self.shutdown_eligible()
             if eligible is True:
                 self._debug("Suggesting shutdown.")
-                _notify_subscribers(self._later, self.subscribers)
+                _notify_subscribers(self.actor_ref.proxy(), self.subscribers)
             elif self._shutdowns.window_open():
                 self._debug("Cannot shut down because %s", eligible)
             elif self.last_shutdown_opening != next_opening:
@@ -403,7 +408,7 @@ class ComputeNodeMonitorActor(config.actor_class):
         first_ping_s = arvados_node.get('first_ping_at')
         if (self.arvados_node is not None) or (not first_ping_s):
             return None
-        elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
+        elif ((arvados_node['info'].get('ec2_instance_id') == self._cloud.node_id(self.cloud_node)) and
               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
             self._later.update_arvados_node(arvados_node)
             return self.cloud_node.id