2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
11 from . import ComputeNodeMonitorActor
12 from . import ComputeNodeSetupActor as SetupActorBase
13 from . import ComputeNodeShutdownActor as ShutdownActorBase
14 from . import ComputeNodeUpdateActor as UpdateActorBase
15 from .. import RetryMixin
17 class SlurmMixin(object):
18 SLURM_END_STATES = frozenset(['down\n', 'down*\n',
19 'drain\n', 'drain*\n',
21 SLURM_DRAIN_STATES = frozenset(['drain\n', 'drng\n'])
23 def _update_slurm_node(self, nodename, updates):
24 cmd = ['scontrol', 'update', 'NodeName=' + nodename] + updates
26 subprocess.check_output(cmd)
29 "SLURM update %r failed", cmd, exc_info=True)
31 def _get_slurm_state(self, nodename):
32 return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', nodename])
35 class ComputeNodeSetupActor(SlurmMixin, SetupActorBase):
36 def create_cloud_node(self):
37 hostname = self.arvados_node.get("hostname")
39 self._update_slurm_node(self.arvados_node['hostname'], [
40 'Weight=%i' % int(self.cloud_size.price * 1000),
41 'Features=instancetype='+self.cloud_size.name,
43 return super(ComputeNodeSetupActor, self).create_cloud_node()
46 class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
48 arv_node = self._arvados_node()
51 return super(ComputeNodeShutdownActor, self).on_start()
54 self._nodename = arv_node['hostname']
55 self._logger.info("Draining SLURM node %s", self._nodename)
56 self._later.issue_slurm_drain()
58 @RetryMixin._retry((subprocess.CalledProcessError, OSError))
59 def cancel_shutdown(self, reason, try_resume=True):
61 if try_resume and self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
62 # Resume from "drng" or "drain"
63 self._update_slurm_node(self._nodename, ['State=RESUME'])
65 # Node is in a state such as 'idle' or 'alloc' so don't
66 # try to resume it because that will just raise an error.
68 return super(ComputeNodeShutdownActor, self).cancel_shutdown(reason)
70 @RetryMixin._retry((subprocess.CalledProcessError, OSError))
71 def issue_slurm_drain(self):
72 if self.cancel_reason is not None:
75 self._update_slurm_node(self._nodename, [
76 'State=DRAIN', 'Reason=Node Manager shutdown'])
77 self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
78 self._later.await_slurm_drain()
80 self._later.shutdown_node()
82 @RetryMixin._retry((subprocess.CalledProcessError, OSError))
83 def await_slurm_drain(self):
84 if self.cancel_reason is not None:
86 output = self._get_slurm_state(self._nodename)
87 if output in ("drng\n", "alloc\n", "drng*\n", "alloc*\n"):
88 self._timer.schedule(time.time() + 10,
89 self._later.await_slurm_drain)
90 elif output in ("idle\n",):
91 # Not in "drng" but idle, don't shut down
92 self.cancel_shutdown("slurm state is %s" % output.strip(), try_resume=False)
95 self._later.shutdown_node()
97 def _destroy_node(self):
99 self._update_slurm_node(self._nodename, [
100 'State=DOWN', 'Reason=Node Manager shutdown'])
101 super(ComputeNodeShutdownActor, self)._destroy_node()
104 class ComputeNodeUpdateActor(SlurmMixin, UpdateActorBase):
105 def sync_node(self, cloud_node, arvados_node):
106 hostname = arvados_node.get("hostname")
108 self._update_slurm_node(hostname, [
109 'Weight=%i' % int(cloud_node.size.price * 1000),
110 'Features=instancetype=' + cloud_node.size.name,
112 return super(ComputeNodeUpdateActor, self).sync_node(
113 cloud_node, arvados_node)