@RetryMixin._retry()
def sync_node(self, cloud_node, arvados_node):
- return self._cloud.sync_node(cloud_node, arvados_node)
+ if self._cloud.node_fqdn(cloud_node) != arvados_node_fqdn(arvados_node):
+ return self._cloud.sync_node(cloud_node, arvados_node)
class ComputeNodeMonitorActor(config.actor_class):
for shutdown.
"""
def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
- cloud_fqdn_func, timer_actor, update_actor, cloud_client,
+ timer_actor, update_actor, cloud_client,
arvados_node=None, poll_stale_after=600, node_stale_after=3600,
boot_fail_after=1800
):
super(ComputeNodeMonitorActor, self).__init__()
self._later = self.actor_ref.tell_proxy()
self._shutdowns = shutdown_timer
- self._cloud_node_fqdn = cloud_fqdn_func
self._timer = timer_actor
self._update = update_actor
self._cloud = cloud_client
self._later.consider_shutdown()
def update_arvados_node(self, arvados_node):
- # If the cloud node's FQDN doesn't match what's in the Arvados node
- # record, make them match.
+ """Called when the latest Arvados node record is retrieved.
+
+ Calls the updater's sync_node() method.
+
+ """
# This method is a little unusual in the way it just fires off the
# request without checking the result or retrying errors. That's
# because this update happens every time we reload the Arvados node
# the logic to throttle those effective retries when there's trouble.
if arvados_node is not None:
self.arvados_node = arvados_node
- if (self._cloud_node_fqdn(self.cloud_node) !=
- arvados_node_fqdn(self.arvados_node)):
- self._update.sync_node(self.cloud_node, self.arvados_node)
+ self._update.sync_node(self.cloud_node, self.arvados_node)
self._later.consider_shutdown()
self._logger.error(
"SLURM update %r failed", cmd, exc_info=True)
+ def _update_slurm_size_attrs(self, nodename, size):
+ self._update_slurm_node(nodename, [
+ 'Weight=%i' % int(size.price * 1000),
+ 'Features=instancetype=' + size.name,
+ ])
+
def _get_slurm_state(self, nodename):
return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', nodename])
def create_cloud_node(self):
hostname = self.arvados_node.get("hostname")
if hostname:
- self._update_slurm_node(self.arvados_node['hostname'], [
- 'Weight=%i' % int(self.cloud_size.price * 1000),
- 'Features=instancetype='+self.cloud_size.name,
- ])
+ self._update_slurm_size_attrs(hostname, self.cloud_size)
return super(ComputeNodeSetupActor, self).create_cloud_node()
class ComputeNodeUpdateActor(SlurmMixin, UpdateActorBase):
def sync_node(self, cloud_node, arvados_node):
+ """Keep SLURM's node properties up to date."""
hostname = arvados_node.get("hostname")
if hostname:
- self._update_slurm_node(hostname, [
- 'Weight=%i' % int(cloud_node.size.price * 1000),
- 'Features=instancetype=' + cloud_node.size.name,
- ])
+ # This is only needed when slurm has restarted and lost
+ # the dynamically configured node properties. So it's
+ # usually redundant, but detecting when it's necessary
+ # would be about the same amount of work as doing it
+ # repetitively.
+ self._update_slurm_size_attrs(hostname, cloud_node.size)
return super(ComputeNodeUpdateActor, self).sync_node(
cloud_node, arvados_node)
cloud_node=cloud_node,
cloud_node_start_time=start_time,
shutdown_timer=shutdown_timer,
- cloud_fqdn_func=self._cloud_driver.node_fqdn,
update_actor=self._cloud_updater,
timer_actor=self._timer,
arvados_node=None,
self.assertEqual(self.arvados_effect[-1],
self.setup_actor.arvados_node.get(self.TIMEOUT))
assert(finished.wait(self.TIMEOUT))
+ self.api_client.nodes().create.called_with(body={}, assign_slot=True)
self.assertEqual(1, self.api_client.nodes().create().execute.call_count)
self.assertEqual(1, self.api_client.nodes().update().execute.call_count)
self.assert_node_properties_updated()
self.setup_actor.arvados_node.get(self.TIMEOUT))
assert(finished.wait(self.TIMEOUT))
self.assert_node_properties_updated()
- self.assertEqual(2, self.api_client.nodes().update().execute.call_count)
+ self.api_client.nodes().create.called_with(body={}, assign_slot=True)
+ self.assertEqual(3, self.api_client.nodes().update().execute.call_count)
self.assertEqual(self.cloud_client.create_node(),
self.setup_actor.cloud_node.get(self.TIMEOUT))
start_time = time.time()
monitor_actor = dispatch.ComputeNodeMonitorActor.start(
self.cloud_node, start_time, self.shutdowns,
- testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
+ self.timer, self.updates, self.cloud_client,
self.arvados_node)
self.shutdown_actor = self.ACTOR_CLASS.start(
self.timer, self.cloud_client, self.arvados_client, monitor_actor,
start_time = time.time()
self.node_actor = dispatch.ComputeNodeMonitorActor.start(
self.cloud_mock, start_time, self.shutdowns,
- testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
+ self.timer, self.updates, self.cloud_client,
arv_node, boot_fail_after=300).proxy()
self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
self.assertEqual(testutil.ip_address_mock(4),
current_arvados['ip_address'])
- def test_update_arvados_node_syncs_when_fqdn_mismatch(self):
+ def test_update_arvados_node_calls_sync_node(self):
self.make_mocks(5)
self.cloud_mock.extra['testname'] = 'cloudfqdn.zzzzz.arvadosapi.com'
self.make_actor()
arv_node = testutil.arvados_node_mock(5)
self.node_actor.update_arvados_node(arv_node).get(self.TIMEOUT)
self.assertEqual(1, self.updates.sync_node.call_count)
-
- def test_update_arvados_node_skips_sync_when_fqdn_match(self):
- self.make_mocks(6)
- arv_node = testutil.arvados_node_mock(6)
- self.cloud_mock.extra['testname'] ='{n[hostname]}.{n[domain]}'.format(
- n=arv_node)
- self.make_actor()
- self.node_actor.update_arvados_node(arv_node).get(self.TIMEOUT)
- self.assertEqual(0, self.updates.sync_node.call_count)
@mock.patch('subprocess.check_output')
def test_update_node_features(self, check_output):
- self.make_mocks()
+ # `scontrol update` happens only if the Arvados node record
+ # has a hostname. ComputeNodeSetupActorTestCase.make_mocks
+ # uses mocks with scrubbed hostnames, so we override with the
+ # default testutil.arvados_node_mock.
+ self.make_mocks(arvados_effect=[testutil.arvados_node_mock()])
self.make_actor()
self.wait_for_assignment(self.setup_actor, 'cloud_node')
check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=1000', 'Features=instancetype=z1.test'])
def cloud_node_fqdn(node):
# We intentionally put the FQDN somewhere goofy to make sure tested code is
# using this function for lookups.
- return node.extra.get('testname', 'NoTestName')
+ return node.extra.get('testname', node.name+'.NoTestName.invalid')
def ip_address_mock(last_octet):
return '10.20.30.{}'.format(last_octet)