This actor simply destroys a cloud node, retrying as needed.
"""
def __init__(self, timer_actor, cloud_client, node_monitor,
- retry_wait=1, max_retry_wait=180):
+ cancellable=True, retry_wait=1, max_retry_wait=180):
+ # If a ShutdownActor is cancellable, it will ask the
+ # ComputeNodeMonitorActor if it's still eligible before taking each
+ # action, and stop the shutdown process if the node is no longer
+ # eligible. Normal shutdowns based on job demand should be
+ # cancellable; shutdowns based on node misbehavior should not.
super(ComputeNodeShutdownActor, self).__init__(
'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
self._cloud = cloud_client
self._monitor = node_monitor.proxy()
self.cloud_node = self._monitor.cloud_node.get()
+ self.cancellable = cancellable
self.success = None
def on_start(self):
def _stop_if_window_closed(orig_func):
@functools.wraps(orig_func)
def wrapper(self, *args, **kwargs):
- if not self._monitor.shutdown_eligible().get():
+ if (self.cancellable and
+ (not self._monitor.shutdown_eligible().get())):
self._logger.info(
"Cloud node %s shutdown cancelled - no longer eligible.",
self.cloud_node.id)
import importlib
import logging
import ssl
+import sys
import arvados
import httplib2
'poll_time': '60',
'max_poll_time': '300',
'poll_stale_after': '600',
+ 'boot_fail_after': str(sys.maxint),
'node_stale_after': str(60 * 60 * 2)},
'Logging': {'file': '/dev/stderr',
'level': 'WARNING'},
self.nodes = {}
self.orphans = {}
- def __getitem__(self, key):
- return self.nodes[key]
-
- def __len__(self):
- return len(self.nodes)
+ # Proxy the methods listed below to self.nodes.
+ def _proxy_method(name):
+ method = getattr(dict, name)
+ @functools.wraps(method, ('__name__', '__doc__'))
+ def wrapper(self, *args, **kwargs):
+ return method(self.nodes, *args, **kwargs)
+ return wrapper
- def get(self, key, default=None):
- return self.nodes.get(key, default)
+ for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
+ locals()[_method_name] = _proxy_method(_method_name)
def record_key(self, record):
return self.item_key(getattr(record, self.RECORD_ATTR))
cloud_nodes_actor, cloud_update_actor, timer_actor,
arvados_factory, cloud_factory,
shutdown_windows, min_nodes, max_nodes,
- poll_stale_after=600, node_stale_after=7200,
+ poll_stale_after=600,
+ boot_fail_after=1800,
+ node_stale_after=7200,
node_setup_class=dispatch.ComputeNodeSetupActor,
node_shutdown_class=dispatch.ComputeNodeShutdownActor,
node_actor_class=dispatch.ComputeNodeMonitorActor):
self.min_nodes = min_nodes
self.max_nodes = max_nodes
self.poll_stale_after = poll_stale_after
+ self.boot_fail_after = boot_fail_after
self.node_stale_after = node_stale_after
self.last_polls = {}
for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
return pykka.get_all([getattr(actor, name) for name in attr_names])
def node_up(self, setup_proxy):
- cloud_node, arvados_node = self._get_actor_attrs(
- setup_proxy, 'cloud_node', 'arvados_node')
+ cloud_node = setup_proxy.cloud_node.get()
del self.booting[setup_proxy.actor_ref.actor_urn]
setup_proxy.stop()
record = self.cloud_nodes.get(cloud_node.id)
if record is None:
record = self._new_node(cloud_node)
self.booted[cloud_node.id] = record
- self._pair_nodes(record, arvados_node)
+ self._timer.schedule(time.time() + self.boot_fail_after,
+ self._later.shutdown_unpaired_node, cloud_node.id)
@_check_poll_freshness
def stop_booting_node(self):
self._later.stop_booting_node()
break
- @_check_poll_freshness
- def node_can_shutdown(self, node_actor):
- if self._nodes_excess() < 1:
- return None
+ def _begin_node_shutdown(self, node_actor, cancellable):
cloud_node_id = node_actor.cloud_node.get().id
if cloud_node_id in self.shutdowns:
return None
shutdown = self._node_shutdown.start(
timer_actor=self._timer, cloud_client=self._new_cloud(),
- node_monitor=node_actor.actor_ref).proxy()
+ node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
self.shutdowns[cloud_node_id] = shutdown
shutdown.subscribe(self._later.node_finished_shutdown)
+ @_check_poll_freshness
+ def node_can_shutdown(self, node_actor):
+ if self._nodes_excess() > 0:
+ self._begin_node_shutdown(node_actor, cancellable=True)
+
+ def shutdown_unpaired_node(self, cloud_node_id):
+ for record_dict in [self.cloud_nodes, self.booted]:
+ if cloud_node_id in record_dict:
+ record = record_dict[cloud_node_id]
+ break
+ else:
+ return None
+ if record.arvados_node is None:
+ self._begin_node_shutdown(record.actor, cancellable=False)
+
def node_finished_shutdown(self, shutdown_actor):
success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
'cloud_node')
config.getint('Daemon', 'min_nodes'),
config.getint('Daemon', 'max_nodes'),
config.getint('Daemon', 'poll_stale_after'),
+ config.getint('Daemon', 'boot_fail_after'),
config.getint('Daemon', 'node_stale_after'),
node_setup, node_shutdown, node_monitor).proxy()
# information is too outdated.
poll_stale_after = 600
+# If Node Manager boots a cloud node, and it does not pair with an Arvados
+# node before this long, assume that there was a cloud bootstrap failure and
+# shut it down. Note that normal shutdown windows apply (see the Cloud
+# section), so this should be shorter than the first shutdown window value.
+boot_fail_after = 1800
+
# "Node stale time" affects two related behaviors.
# 1. If a compute node has been running for at least this long, but it
# isn't paired with an Arvados node, do not shut it down, but leave it alone.
self.cloud_node = cloud_node
self.arvados_node = arvados_node
- def make_actor(self):
+ def make_actor(self, cancellable=True):
if not hasattr(self, 'timer'):
self.make_mocks()
monitor_actor = dispatch.ComputeNodeMonitorActor.start(
self.cloud_node, time.time(), self.shutdowns, self.timer,
self.updates, self.arvados_node)
self.shutdown_actor = self.ACTOR_CLASS.start(
- self.timer, self.cloud_client, monitor_actor).proxy()
+ self.timer, self.cloud_client, monitor_actor, cancellable).proxy()
self.monitor_actor = monitor_actor.proxy()
def check_success_flag(self, expected, allow_msg_count=1):
else:
self.fail("success flag {} is not {}".format(last_flag, expected))
+ def test_uncancellable_shutdown(self, *mocks):
+ self.make_mocks(shutdown_open=False)
+ self.cloud_client.destroy_node.return_value = False
+ self.make_actor(cancellable=False)
+ self.check_success_flag(None, 0)
+ self.shutdowns._set_state(True, 600)
+ self.cloud_client.destroy_node.return_value = True
+ self.check_success_flag(True)
+
class ComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
unittest.TestCase):
self.cloud_factory = mock.MagicMock(name='cloud_mock')
self.cloud_factory().node_start_time.return_value = time.time()
self.cloud_updates = mock.MagicMock(name='updates_mock')
- self.timer = testutil.MockTimer()
+ self.timer = testutil.MockTimer(deliver_immediately=False)
self.node_setup = mock.MagicMock(name='setup_mock')
self.node_shutdown = mock.MagicMock(name='shutdown_mock')
self.daemon = nmdaemon.NodeManagerDaemonActor.start(
self.server_wishlist_poller, self.arvados_nodes_poller,
self.cloud_nodes_poller, self.cloud_updates, self.timer,
self.arv_factory, self.cloud_factory,
- [54, 5, 1], min_nodes, max_nodes, 600, 3600,
+ [54, 5, 1], min_nodes, max_nodes, 600, 1800, 3600,
self.node_setup, self.node_shutdown).proxy()
if cloud_nodes is not None:
self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
def alive_monitor_count(self):
return sum(1 for actor in self.monitor_list() if actor.is_alive())
+ def assertShutdownCancellable(self, expected=True):
+ self.assertTrue(self.node_shutdown.start.called)
+ self.assertIs(expected,
+ self.node_shutdown.start.call_args[1]['cancellable'],
+ "ComputeNodeShutdownActor incorrectly cancellable")
+
def test_easy_node_creation(self):
size = testutil.MockSize(1)
self.make_daemon(want_sizes=[size])
monitor = self.monitor_list()[0].proxy()
self.daemon.update_server_wishlist([])
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
- self.assertTrue(self.node_shutdown.start.called,
- "daemon did not shut down booted node on offer")
+ self.assertShutdownCancellable(True)
shutdown = self.node_shutdown.start().proxy()
shutdown.cloud_node.get.return_value = cloud_node
self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
self.assertTrue(self.node_setup.start.called,
"second node not started after booted node stopped")
+ def test_booted_node_shut_down_when_never_listed(self):
+ setup = self.start_node_boot()
+ self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.assertEqual(1, self.alive_monitor_count())
+ self.assertFalse(self.node_shutdown.start.called)
+ self.timer.deliver()
+ self.stop_proxy(self.daemon)
+ self.assertShutdownCancellable(False)
+
+ def test_booted_node_shut_down_when_never_paired(self):
+ cloud_node = testutil.cloud_node_mock(2)
+ setup = self.start_node_boot(cloud_node)
+ self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.assertEqual(1, self.alive_monitor_count())
+ self.daemon.update_cloud_nodes([cloud_node])
+ self.timer.deliver()
+ self.stop_proxy(self.daemon)
+ self.assertShutdownCancellable(False)
+
+ def test_node_that_pairs_not_considered_failed_boot(self):
+ cloud_node = testutil.cloud_node_mock(3)
+ arv_node = testutil.arvados_node_mock(3)
+ setup = self.start_node_boot(cloud_node, arv_node)
+ self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.assertEqual(1, self.alive_monitor_count())
+ self.daemon.update_cloud_nodes([cloud_node])
+ self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+ self.timer.deliver()
+ self.stop_proxy(self.daemon)
+ self.assertFalse(self.node_shutdown.start.called)
+
def test_booting_nodes_shut_down(self):
self.make_daemon(want_sizes=[testutil.MockSize(1)])
self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
self.assertTrue(new_node.stop_if_no_cloud_node.called)
self.daemon.node_up(new_node).get(self.TIMEOUT)
self.assertTrue(new_node.stop.called)
+ self.timer.deliver()
self.assertTrue(
self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
self.make_daemon(want_sizes=[size])
self.daemon.shutdown().get(self.TIMEOUT)
self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
+ self.timer.deliver()
self.stop_proxy(self.daemon)
self.assertEqual(1, self.node_setup.start.call_count)
from __future__ import absolute_import, print_function
+import threading
import time
import mock
class MockTimer(object):
+ def __init__(self, deliver_immediately=True):
+ self.deliver_immediately = deliver_immediately
+ self.messages = []
+ self.lock = threading.Lock()
+
+ def deliver(self):
+ with self.lock:
+ to_deliver = self.messages
+ self.messages = []
+ for callback, args, kwargs in to_deliver:
+ callback(*args, **kwargs)
+
def schedule(self, want_time, callback, *args, **kwargs):
- return callback(*args, **kwargs)
+ with self.lock:
+ self.messages.append((callback, args, kwargs))
+ if self.deliver_immediately:
+ self.deliver()
class ActorTestMixin(object):