Merge branch '8784-dir-listings'
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / slurm.py
index c395a3017042c4a55726594ff36351fc2270363e..fa56578cffa1108526584ded9730a9cb5ffbbda9 100644 (file)
@@ -1,4 +1,7 @@
 #!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
 
 from __future__ import absolute_import, print_function
 
@@ -6,68 +9,88 @@ import subprocess
 import time
 
 from . import \
-    ComputeNodeSetupActor, ComputeNodeUpdateActor, ComputeNodeMonitorActor
+    ComputeNodeSetupActor, ComputeNodeMonitorActor
 from . import ComputeNodeShutdownActor as ShutdownActorBase
-from .. import _retry
+from . import ComputeNodeUpdateActor as UpdateActorBase
+from .. import RetryMixin
 
-class ComputeNodeShutdownActor(ShutdownActorBase):
+class SlurmMixin(object):
     SLURM_END_STATES = frozenset(['down\n', 'down*\n',
                                   'drain\n', 'drain*\n',
                                   'fail\n', 'fail*\n'])
     SLURM_DRAIN_STATES = frozenset(['drain\n', 'drng\n'])
 
+    def _set_node_state(self, nodename, state, *args):
+        cmd = ['scontrol', 'update', 'NodeName=' + nodename,
+               'State=' + state]
+        cmd.extend(args)
+        subprocess.check_output(cmd)
+
+    def _get_slurm_state(self, nodename):
+        return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', nodename])
+
+
+class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
     def on_start(self):
         arv_node = self._arvados_node()
         if arv_node is None:
             self._nodename = None
             return super(ComputeNodeShutdownActor, self).on_start()
         else:
+            self._set_logger()
             self._nodename = arv_node['hostname']
             self._logger.info("Draining SLURM node %s", self._nodename)
             self._later.issue_slurm_drain()
 
-    def _set_node_state(self, state, *args):
-        cmd = ['scontrol', 'update', 'NodeName=' + self._nodename,
-               'State=' + state]
-        cmd.extend(args)
-        subprocess.check_output(cmd)
-
-    def _get_slurm_state(self):
-        return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', self._nodename])
-
-    # The following methods retry on OSError.  This is intended to mitigate bug
-    # #6321 where fork() of node manager raises "OSError: [Errno 12] Cannot
-    # allocate memory" resulting in the untimely death of the shutdown actor
-    # and tends to result in node manager getting into a wedged state where it
-    # won't allocate new nodes or shut down gracefully.  The underlying causes
-    # of the excessive memory usage that result in the "Cannot allocate memory"
-    # error are still being investigated.
-
-    @_retry((subprocess.CalledProcessError, OSError))
-    def cancel_shutdown(self, reason):
+    @RetryMixin._retry((subprocess.CalledProcessError, OSError))
+    def cancel_shutdown(self, reason, try_resume=True):
         if self._nodename:
-            if self._get_slurm_state() in self.SLURM_DRAIN_STATES:
+            if try_resume and self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
                 # Resume from "drng" or "drain"
-                self._set_node_state('RESUME')
+                self._set_node_state(self._nodename, 'RESUME')
             else:
                 # Node is in a state such as 'idle' or 'alloc' so don't
                 # try to resume it because that will just raise an error.
                 pass
         return super(ComputeNodeShutdownActor, self).cancel_shutdown(reason)
 
-    @_retry((subprocess.CalledProcessError, OSError))
-    @ShutdownActorBase._stop_if_window_closed
+    @RetryMixin._retry((subprocess.CalledProcessError, OSError))
     def issue_slurm_drain(self):
-        self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
-        self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
-        self._later.await_slurm_drain()
+        if self.cancel_reason is not None:
+            return
+        if self._nodename:
+            self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
+            self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
+            self._later.await_slurm_drain()
+        else:
+            self._later.shutdown_node()
 
-    @_retry((subprocess.CalledProcessError, OSError))
-    @ShutdownActorBase._stop_if_window_closed
+    @RetryMixin._retry((subprocess.CalledProcessError, OSError))
     def await_slurm_drain(self):
-        output = self._get_slurm_state()
-        if output in self.SLURM_END_STATES:
-            self._later.shutdown_node()
-        else:
+        if self.cancel_reason is not None:
+            return
+        output = self._get_slurm_state(self._nodename)
+        if output in ("drng\n", "alloc\n", "drng*\n", "alloc*\n"):
             self._timer.schedule(time.time() + 10,
                                  self._later.await_slurm_drain)
+        elif output in ("idle\n"):
+            # Not in "drng" but idle, don't shut down
+            self.cancel_shutdown("slurm state is %s" % output.strip(), try_resume=False)
+        else:
+            # any other state.
+            self._later.shutdown_node()
+
+    def _destroy_node(self):
+        if self._nodename:
+            self._set_node_state(self._nodename, 'DOWN', 'Reason=Node Manager shutdown')
+        super(ComputeNodeShutdownActor, self)._destroy_node()
+
+
+class ComputeNodeUpdateActor(UpdateActorBase):
+    def sync_node(self, cloud_node, arvados_node):
+        if arvados_node.get("hostname"):
+            try:
+                subprocess.check_output(['scontrol', 'update', 'NodeName=' + arvados_node["hostname"], 'Weight=%i' % int(cloud_node.size.price * 1000)])
+            except:
+                self._logger.error("Unable to set slurm node weight.", exc_info=True)
+        return super(ComputeNodeUpdateActor, self).sync_node(cloud_node, arvados_node)