Merge branch 'master' into 3198-writable-fuse
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
index 0fab1b0fec5f3e0fd0696460089028ab244cfb66..6d5c223fac15d6e25a95f44446eb88f4b54a6f42 100644 (file)
@@ -20,12 +20,13 @@ class ComputeNodeStateChangeBase(config.actor_class):
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
-    def __init__(self, logger_name, cloud_client, timer_actor,
+    def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
                  retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
         self._later = self.actor_ref.proxy()
         self._logger = logging.getLogger(logger_name)
         self._cloud = cloud_client
                  retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
         self._later = self.actor_ref.proxy()
         self._logger = logging.getLogger(logger_name)
         self._cloud = cloud_client
+        self._arvados = arvados_client
         self._timer = timer_actor
         self.min_retry_wait = retry_wait
         self.max_retry_wait = max_retry_wait
         self._timer = timer_actor
         self.min_retry_wait = retry_wait
         self.max_retry_wait = max_retry_wait
@@ -79,6 +80,18 @@ class ComputeNodeStateChangeBase(config.actor_class):
         else:
             self.subscribers.add(subscriber)
 
         else:
             self.subscribers.add(subscriber)
 
+    def _clean_arvados_node(self, arvados_node, explanation):
+        return self._arvados.nodes().update(
+            uuid=arvados_node['uuid'],
+            body={'hostname': None,
+                  'ip_address': None,
+                  'slot_number': None,
+                  'first_ping_at': None,
+                  'last_ping_at': None,
+                  'info': {'ec2_instance_id': None,
+                           'last_action': explanation}},
+            ).execute()
+
 
 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     """Actor to create and set up a cloud compute node.
 
 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     """Actor to create and set up a cloud compute node.
@@ -93,9 +106,8 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', cloud_client, timer_actor,
+            'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
             retry_wait, max_retry_wait)
             retry_wait, max_retry_wait)
-        self._arvados = arvados_client
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
@@ -111,16 +123,8 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
 
     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
     def prepare_arvados_node(self, node):
 
     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
     def prepare_arvados_node(self, node):
-        self.arvados_node = self._arvados.nodes().update(
-            uuid=node['uuid'],
-            body={'hostname': None,
-                  'ip_address': None,
-                  'slot_number': None,
-                  'first_ping_at': None,
-                  'last_ping_at': None,
-                  'info': {'ec2_instance_id': None,
-                           'last_action': "Prepared by Node Manager"}}
-            ).execute()
+        self.arvados_node = self._clean_arvados_node(
+            node, "Prepared by Node Manager")
         self._later.create_cloud_node()
 
     @ComputeNodeStateChangeBase._retry()
         self._later.create_cloud_node()
 
     @ComputeNodeStateChangeBase._retry()
@@ -150,7 +154,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
     This actor simply destroys a cloud node, retrying as needed.
     """
 
     This actor simply destroys a cloud node, retrying as needed.
     """
-    def __init__(self, timer_actor, cloud_client, node_monitor,
+    def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
                  cancellable=True, retry_wait=1, max_retry_wait=180):
         # If a ShutdownActor is cancellable, it will ask the
         # ComputeNodeMonitorActor if it's still eligible before taking each
                  cancellable=True, retry_wait=1, max_retry_wait=180):
         # If a ShutdownActor is cancellable, it will ask the
         # ComputeNodeMonitorActor if it's still eligible before taking each
@@ -158,7 +162,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         # eligible.  Normal shutdowns based on job demand should be
         # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
         # eligible.  Normal shutdowns based on job demand should be
         # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', cloud_client, timer_actor,
+            'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
             retry_wait, max_retry_wait)
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
             retry_wait, max_retry_wait)
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
@@ -168,9 +172,16 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
     def on_start(self):
         self._later.shutdown_node()
 
     def on_start(self):
         self._later.shutdown_node()
 
+    def _arvados_node(self):
+        return self._monitor.arvados_node.get()
+
+    def _finished(self, success_flag=None):
+        if success_flag is not None:
+            self.success = success_flag
+        return super(ComputeNodeShutdownActor, self)._finished()
+
     def cancel_shutdown(self):
     def cancel_shutdown(self):
-        self.success = False
-        self._finished()
+        self._finished(success_flag=False)
 
     def _stop_if_window_closed(orig_func):
         @functools.wraps(orig_func)
 
     def _stop_if_window_closed(orig_func):
         @functools.wraps(orig_func)
@@ -189,13 +200,20 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
     @_stop_if_window_closed
     @ComputeNodeStateChangeBase._retry()
     def shutdown_node(self):
     @_stop_if_window_closed
     @ComputeNodeStateChangeBase._retry()
     def shutdown_node(self):
-        if self._cloud.destroy_node(self.cloud_node):
-            self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
-            self.success = True
-            self._finished()
-        else:
+        if not self._cloud.destroy_node(self.cloud_node):
             # Force a retry.
             raise cloud_types.LibcloudError("destroy_node failed")
             # Force a retry.
             raise cloud_types.LibcloudError("destroy_node failed")
+        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+        arv_node = self._arvados_node()
+        if arv_node is None:
+            self._finished(success_flag=True)
+        else:
+            self._later.clean_arvados_node(arv_node)
+
+    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    def clean_arvados_node(self, arvados_node):
+        self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
+        self._finished(success_flag=True)
 
     # Make the decorator available to subclasses.
     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
 
     # Make the decorator available to subclasses.
     _stop_if_window_closed = staticmethod(_stop_if_window_closed)