import time
from . import \
- ComputeNodeSetupActor, ComputeNodeUpdateActor
+ ComputeNodeSetupActor, ComputeNodeMonitorActor
from . import ComputeNodeShutdownActor as ShutdownActorBase
-from . import ComputeNodeMonitorActor as MonitorActorBase
+from . import ComputeNodeUpdateActor as UpdateActorBase
from .. import RetryMixin
class SlurmMixin(object):
self._later.issue_slurm_drain()
@RetryMixin._retry((subprocess.CalledProcessError,))
- def cancel_shutdown(self, reason):
+ def cancel_shutdown(self, reason, try_resume=True):
if self._nodename:
- if self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
+ if try_resume and self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
# Resume from "drng" or "drain"
self._set_node_state(self._nodename, 'RESUME')
else:
return super(ComputeNodeShutdownActor, self).cancel_shutdown(reason)
@RetryMixin._retry((subprocess.CalledProcessError,))
- @ShutdownActorBase._stop_if_window_closed
def issue_slurm_drain(self):
- self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
- self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
- self._later.await_slurm_drain()
+ if self.cancel_reason is not None:
+ return
+ if self._nodename:
+ self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
+ self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
+ self._later.await_slurm_drain()
+ else:
+ self._later.shutdown_node()
@RetryMixin._retry((subprocess.CalledProcessError,))
- @ShutdownActorBase._stop_if_window_closed
def await_slurm_drain(self):
+ if self.cancel_reason is not None:
+ return
output = self._get_slurm_state(self._nodename)
- if output in self.SLURM_END_STATES:
- self._later.shutdown_node()
- else:
+ if output in ("drng\n", "alloc\n", "drng*\n", "alloc*\n"):
self._timer.schedule(time.time() + 10,
self._later.await_slurm_drain)
+ elif output in ("idle\n"):
+ # Not in "drng" but idle, don't shut down
+ self.cancel_shutdown("slurm state is %s" % output.strip(), try_resume=False)
+ else:
+ # any other state.
+ self._later.shutdown_node()
+ def _destroy_node(self):
+ if self._nodename:
+ self._set_node_state(self._nodename, 'DOWN', 'Reason=Node Manager shutdown')
+ super(ComputeNodeShutdownActor, self)._destroy_node()
-class ComputeNodeMonitorActor(SlurmMixin, MonitorActorBase):
-
- def shutdown_eligible(self):
- if self.arvados_node is not None:
- state = self._get_slurm_state(self.arvados_node['hostname'])
- # Automatically eligible for shutdown if it's down or failed, but
- # not drain to avoid a race condition with resume_node().
- if state in self.SLURM_END_STATES:
- if state in self.SLURM_DRAIN_STATES:
- return "node is draining"
- else:
- return True
- return super(ComputeNodeMonitorActor, self).shutdown_eligible()
- def resume_node(self):
- try:
- if (self.arvados_node is not None and
- self._get_slurm_state(self.arvados_node['hostname']) in self.SLURM_DRAIN_STATES):
- # Resume from "drng" or "drain"
- self._set_node_state(self.arvados_node['hostname'], 'RESUME')
- except Exception as error:
- self._logger.warn(
- "Exception reenabling node: %s", error, exc_info=error)
+class ComputeNodeUpdateActor(UpdateActorBase):
+ def sync_node(self, cloud_node, arvados_node):
+ if arvados_node.get("hostname"):
+ try:
+ subprocess.check_output(['scontrol', 'update', 'NodeName=' + arvados_node["hostname"], 'Weight=%i' % int(cloud_node.size.price * 1000)])
+ except:
+ self._logger.error("Unable to set slurm node weight.", exc_info=True)
+ return super(ComputeNodeUpdateActor, self).sync_node(cloud_node, arvados_node)