Merge branch 'master' into 5383-api-db-current-time
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
index c79d8f9588fbea18e276fd1e8f30474b3ac7677e..70817627dfe8d3194435a7a31f1df8e330e37ed5 100644 (file)
@@ -9,7 +9,8 @@ import time
 import libcloud.common.types as cloud_types
 import pykka
 
-from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
+from .. import \
+    arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh
 from ...clientactor import _notify_subscribers
 from ... import config
 
@@ -19,32 +20,38 @@ class ComputeNodeStateChangeBase(config.actor_class):
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
-    def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
+    def __init__(self, logger_name, cloud_client, timer_actor,
+                 retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
         self._later = self.actor_ref.proxy()
-        self._timer = timer_actor
         self._logger = logging.getLogger(logger_name)
+        self._cloud = cloud_client
+        self._timer = timer_actor
         self.min_retry_wait = retry_wait
         self.max_retry_wait = max_retry_wait
         self.retry_wait = retry_wait
         self.subscribers = set()
 
     @staticmethod
-    def _retry(errors):
+    def _retry(errors=()):
         """Retry decorator for an actor method that makes remote requests.
 
         Use this function to decorator an actor method, and pass in a
         tuple of exceptions to catch.  This decorator will schedule
         retries of that method with exponential backoff if the
-        original method raises any of the given errors.
+        original method raises a known cloud driver error, or any of the
+        given exception types.
         """
         def decorator(orig_func):
             @functools.wraps(orig_func)
-            def wrapper(self, *args, **kwargs):
+            def retry_wrapper(self, *args, **kwargs):
                 start_time = time.time()
                 try:
                     orig_func(self, *args, **kwargs)
-                except errors as error:
+                except Exception as error:
+                    if not (isinstance(error, errors) or
+                            self._cloud.is_cloud_exception(error)):
+                        raise
                     self._logger.warning(
                         "Client error: %s - waiting %s seconds",
                         error, self.retry_wait)
@@ -56,7 +63,7 @@ class ComputeNodeStateChangeBase(config.actor_class):
                                           self.max_retry_wait)
                 else:
                     self.retry_wait = self.min_retry_wait
-            return wrapper
+            return retry_wrapper
         return decorator
 
     def _finished(self):
@@ -86,9 +93,9 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
+            'arvnodeman.nodeup', cloud_client, timer_actor,
+            retry_wait, max_retry_wait)
         self._arvados = arvados_client
-        self._cloud = cloud_client
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
@@ -97,12 +104,12 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         else:
             self._later.prepare_arvados_node(arvados_node)
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def prepare_arvados_node(self, node):
         self.arvados_node = self._arvados.nodes().update(
             uuid=node['uuid'],
@@ -116,13 +123,19 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
             ).execute()
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def create_cloud_node(self):
         self._logger.info("Creating cloud node with size %s.",
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
+        self._later.post_create()
+
+    @ComputeNodeStateChangeBase._retry()
+    def post_create(self):
+        self._cloud.post_create_node(self.cloud_node)
+        self._logger.info("%s post-create work done.", self.cloud_node.id)
         self._finished()
 
     def stop_if_no_cloud_node(self):
@@ -143,8 +156,8 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         # eligible.  Normal shutdowns based on job demand should be
         # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
-        self._cloud = cloud_client
+            'arvnodeman.nodedown', cloud_client, timer_actor,
+            retry_wait, max_retry_wait)
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
         self.cancellable = cancellable
@@ -159,7 +172,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
     def _stop_if_window_closed(orig_func):
         @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
+        def stop_wrapper(self, *args, **kwargs):
             if (self.cancellable and
                   (not self._monitor.shutdown_eligible().get())):
                 self._logger.info(
@@ -169,10 +182,10 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
                 return None
             else:
                 return orig_func(self, *args, **kwargs)
-        return wrapper
+        return stop_wrapper
 
     @_stop_if_window_closed
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
+    @ComputeNodeStateChangeBase._retry()
     def shutdown_node(self):
         if self._cloud.destroy_node(self.cloud_node):
             self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
@@ -210,14 +223,14 @@ class ComputeNodeUpdateActor(config.actor_class):
 
     def _throttle_errors(orig_func):
         @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
+        def throttle_wrapper(self, *args, **kwargs):
             throttle_time = self.next_request_time - time.time()
             if throttle_time > 0:
                 time.sleep(throttle_time)
             self.next_request_time = time.time()
             try:
                 result = orig_func(self, *args, **kwargs)
-            except config.CLOUD_ERRORS:
+            except Exception as error:
                 self.error_streak += 1
                 self.next_request_time += min(2 ** self.error_streak,
                                               self.max_retry_wait)
@@ -225,7 +238,7 @@ class ComputeNodeUpdateActor(config.actor_class):
             else:
                 self.error_streak = 0
                 return result
-        return wrapper
+        return throttle_wrapper
 
     @_throttle_errors
     def sync_node(self, cloud_node, arvados_node):
@@ -240,13 +253,14 @@ class ComputeNodeMonitorActor(config.actor_class):
     for shutdown.
     """
     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
-                 timer_actor, update_actor, arvados_node=None,
+                 cloud_fqdn_func, timer_actor, update_actor, arvados_node=None,
                  poll_stale_after=600, node_stale_after=3600):
         super(ComputeNodeMonitorActor, self).__init__()
         self._later = self.actor_ref.proxy()
         self._logger = logging.getLogger('arvnodeman.computenode')
         self._last_log = None
         self._shutdowns = shutdown_timer
+        self._cloud_node_fqdn = cloud_fqdn_func
         self._timer = timer_actor
         self._update = update_actor
         self.cloud_node = cloud_node
@@ -276,7 +290,7 @@ class ComputeNodeMonitorActor(config.actor_class):
         if (self.arvados_node is None) or not timestamp_fresh(
               arvados_node_mtime(self.arvados_node), self.node_stale_after):
             return None
-        state = self.arvados_node['info'].get('slurm_state')
+        state = self.arvados_node['crunch_worker_state']
         if not state:
             return None
         result = state in states
@@ -310,9 +324,11 @@ class ComputeNodeMonitorActor(config.actor_class):
             self.last_shutdown_opening = next_opening
 
     def offer_arvados_pair(self, arvados_node):
-        if self.arvados_node is not None:
+        first_ping_s = arvados_node.get('first_ping_at')
+        if (self.arvados_node is not None) or (not first_ping_s):
             return None
-        elif arvados_node['ip_address'] in self.cloud_node.private_ips:
+        elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
+              (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
             self._later.update_arvados_node(arvados_node)
             return self.cloud_node.id
         else:
@@ -324,9 +340,17 @@ class ComputeNodeMonitorActor(config.actor_class):
             self._later.consider_shutdown()
 
     def update_arvados_node(self, arvados_node):
+        # If the cloud node's FQDN doesn't match what's in the Arvados node
+        # record, make them match.
+        # This method is a little unusual in the way it just fires off the
+        # request without checking the result or retrying errors.  That's
+        # because this update happens every time we reload the Arvados node
+        # list: if a previous sync attempt failed, we'll see that the names
+        # are out of sync and just try again.  ComputeNodeUpdateActor has
+        # the logic to throttle those effective retries when there's trouble.
         if arvados_node is not None:
             self.arvados_node = arvados_node
-            new_hostname = arvados_node_fqdn(self.arvados_node)
-            if new_hostname != self.cloud_node.name:
+            if (self._cloud_node_fqdn(self.cloud_node) !=
+                  arvados_node_fqdn(self.arvados_node)):
                 self._update.sync_node(self.cloud_node, self.arvados_node)
             self._later.consider_shutdown()