class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
unittest.TestCase):
- def new_setup(self, **kwargs):
+ def mock_node_start(self, **kwargs):
# Make sure that every time the daemon starts a setup actor,
# it gets a new mock object back.
+ get_cloud_size = mock.MagicMock()
+ get_cloud_size.get.return_value = kwargs["cloud_size"]
+ mock_actor = mock.MagicMock()
+ mock_proxy = mock.NonCallableMock(name='setup_mock_proxy',
+ cloud_size=get_cloud_size,
+ actor_ref=mock_actor)
+ mock_actor.proxy.return_value = mock_proxy
+ mock_actor.tell_proxy.return_value = mock_proxy
+
+ self.last_setup = mock_proxy
+ return mock_actor
+
+ def mock_node_shutdown(self, **kwargs):
+ # Make sure that every time the daemon starts a shutdown actor,
+ # it gets a new mock object back.
get_cloud_node = mock.MagicMock()
- get_cloud_node.get.return_value = mock.NonCallableMock(size=kwargs["cloud_size"])
+ if "node_monitor" in kwargs:
+ get_cloud_node.get.return_value = kwargs["node_monitor"].proxy().cloud_node.get()
+ mock_actor = mock.MagicMock()
+ mock_proxy = mock.NonCallableMock(name='shutdown_mock_proxy',
+ cloud_node=get_cloud_node,
+ actor_ref=mock_actor)
- self.last_setup = mock.NonCallableMock(name='setup_mock',
- cloud_node=get_cloud_node)
- self.last_setup.proxy = mock.MagicMock(return_value = self.last_setup)
+ mock_actor.proxy.return_value = mock_proxy
+ self.last_shutdown = mock_proxy
- return self.last_setup
+ return mock_actor
def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
- avail_sizes=[(testutil.MockSize(1), {"cores": 1})], min_nodes=0, max_nodes=8):
+ avail_sizes=[(testutil.MockSize(1), {"cores": 1})],
+ min_nodes=0, max_nodes=8,
+ shutdown_windows=[54, 5, 1],
+ max_total_price=None):
for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
self.arv_factory = mock.MagicMock(name='arvados_mock')
+ api_client = mock.MagicMock(name='api_client')
+ api_client.nodes().create().execute.side_effect = [testutil.arvados_node_mock(1),
+ testutil.arvados_node_mock(2)]
+ self.arv_factory.return_value = api_client
+
self.cloud_factory = mock.MagicMock(name='cloud_mock')
self.cloud_factory().node_start_time.return_value = time.time()
self.cloud_updates = mock.MagicMock(name='updates_mock')
self.timer = testutil.MockTimer(deliver_immediately=False)
+ self.cloud_factory().node_id.side_effect = lambda node: node.id
self.node_setup = mock.MagicMock(name='setup_mock')
- self.node_setup.start.side_effect = self.new_setup
+ self.node_setup.start.side_effect = self.mock_node_start
self.node_setup.reset_mock()
+
self.node_shutdown = mock.MagicMock(name='shutdown_mock')
+ self.node_shutdown.start.side_effect = self.mock_node_shutdown
+
self.daemon = nmdaemon.NodeManagerDaemonActor.start(
self.server_wishlist_poller, self.arvados_nodes_poller,
self.cloud_nodes_poller, self.cloud_updates, self.timer,
self.arv_factory, self.cloud_factory,
- [54, 5, 1], ServerCalculator(avail_sizes),
+ shutdown_windows, ServerCalculator(avail_sizes),
min_nodes, max_nodes, 600, 1800, 3600,
- self.node_setup, self.node_shutdown).proxy()
- if cloud_nodes is not None:
- self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
+ self.node_setup, self.node_shutdown,
+ max_total_price=max_total_price).proxy()
if arvados_nodes is not None:
self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
+ if cloud_nodes is not None:
+ self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
if want_sizes is not None:
self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
def test_node_pairing_after_arvados_update(self):
cloud_node = testutil.cloud_node_mock(2)
self.make_daemon([cloud_node],
- [testutil.arvados_node_mock(2, ip_address=None)])
+ [testutil.arvados_node_mock(1, ip_address=None)])
arv_node = testutil.arvados_node_mock(2)
self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
def test_old_arvados_node_not_double_assigned(self):
arv_node = testutil.arvados_node_mock(3, age=9000)
size = testutil.MockSize(3)
- self.make_daemon(arvados_nodes=[arv_node])
+ self.make_daemon(arvados_nodes=[arv_node], avail_sizes=[(size, {"cores":1})])
self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1),
testutil.cloud_node_mock(2)],
arvados_nodes=[testutil.arvados_node_mock(1),
- testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
+ testutil.arvados_node_mock(2,
+ last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size, size])
self.stop_proxy(self.daemon)
self.assertTrue(self.node_setup.start.called)
arvados_nodes=[testutil.arvados_node_mock(1),
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size])
- self.daemon.shutdowns.get()[cloud_nodes[1].id] = mock.MagicMock(name='shutdown_proxy_mock')
+
+ get_cloud_node = mock.MagicMock(name="get_cloud_node")
+ get_cloud_node.get.return_value = cloud_nodes[1]
+ mock_node_monitor = mock.MagicMock()
+ mock_node_monitor.proxy.return_value = mock.NonCallableMock(cloud_node=get_cloud_node)
+ mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
+
+ self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
+
self.assertEqual(2, self.alive_monitor_count())
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
- self.assertEqual(0, self.node_shutdown.start.call_count)
+ self.assertEqual(1, self.node_shutdown.start.call_count)
def test_booting_nodes_counted(self):
cloud_node = testutil.cloud_node_mock(1)
self.assertEqual(1, self.node_setup.start.call_count)
def test_boot_new_node_when_all_nodes_busy(self):
+ size = testutil.MockSize(2)
arv_node = testutil.arvados_node_mock(2, job_uuid=True)
- self.make_daemon([testutil.cloud_node_mock(2)], [arv_node],
- [testutil.MockSize(2)])
+ self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
+ [size], avail_sizes=[(size, {"cores":1})])
+ self.busywait(lambda: self.node_setup.start.called)
self.stop_proxy(self.daemon)
self.assertTrue(self.node_setup.start.called)
cloud_node = testutil.cloud_node_mock(id_num)
if arv_node is None:
arv_node = testutil.arvados_node_mock(id_num)
- self.make_daemon(want_sizes=[testutil.MockSize(id_num)])
+ self.make_daemon(want_sizes=[testutil.MockSize(id_num)],
+ avail_sizes=[(testutil.MockSize(id_num), {"cores":1})])
self.daemon.max_nodes.get(self.TIMEOUT)
self.assertEqual(1, self.node_setup.start.call_count)
self.last_setup.cloud_node.get.return_value = cloud_node
self.last_setup.arvados_node.get.return_value = arv_node
return self.last_setup
- def test_no_new_node_when_booted_node_not_usable(self):
+ def test_new_node_when_booted_node_not_usable(self):
cloud_node = testutil.cloud_node_mock(4)
arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
setup = self.start_node_boot(cloud_node, arv_node)
self.daemon.node_up(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
- self.daemon.update_cloud_nodes([cloud_node])
self.daemon.update_arvados_nodes([arv_node])
+ self.daemon.update_cloud_nodes([cloud_node])
+ self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-1801
self.daemon.update_server_wishlist(
[testutil.MockSize(1)]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
- self.assertEqual(1, self.node_setup.start.call_count)
+ self.assertEqual(2, self.node_setup.start.call_count)
def test_no_duplication_when_booting_node_listed_fast(self):
# Test that we don't start two ComputeNodeMonitorActors when
shutdown = self.node_shutdown.start().proxy()
shutdown.cloud_node.get.return_value = cloud_node
self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
+ self.daemon.update_cloud_nodes([])
self.assertTrue(shutdown.stop.called,
"shutdown actor not stopped after finishing")
self.assertTrue(monitor.actor_ref.actor_stopped.wait(self.TIMEOUT),
def test_booted_node_shut_down_when_never_listed(self):
setup = self.start_node_boot()
+ self.cloud_factory().node_start_time.return_value = time.time() - 3601
self.daemon.node_up(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
self.assertFalse(self.node_shutdown.start.called)
- self.timer.deliver()
+ now = time.time()
+ self.monitor_list()[0].tell_proxy().consider_shutdown()
+ self.busywait(lambda: self.node_shutdown.start.called)
self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
def test_booted_node_shut_down_when_never_paired(self):
cloud_node = testutil.cloud_node_mock(2)
setup = self.start_node_boot(cloud_node)
+ self.cloud_factory().node_start_time.return_value = time.time() - 3601
self.daemon.node_up(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
self.daemon.update_cloud_nodes([cloud_node])
- self.timer.deliver()
+ self.monitor_list()[0].tell_proxy().consider_shutdown()
+ self.busywait(lambda: self.node_shutdown.start.called)
self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
cloud_node = testutil.cloud_node_mock(4)
arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
setup = self.start_node_boot(cloud_node, arv_node)
+ self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
self.daemon.node_up(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
+ self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
self.daemon.update_cloud_nodes([cloud_node])
- self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
- self.timer.deliver()
+ self.busywait(lambda: self.node_shutdown.start.called)
self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
def test_all_booting_nodes_tried_to_shut_down(self):
size = testutil.MockSize(2)
- self.make_daemon(want_sizes=[size])
+ self.make_daemon(want_sizes=[size], avail_sizes=[(size, {"cores":1})])
self.daemon.max_nodes.get(self.TIMEOUT)
setup1 = self.last_setup
setup1.stop_if_no_cloud_node().get.return_value = False
def test_shutdown_declined_at_wishlist_capacity(self):
cloud_node = testutil.cloud_node_mock(1)
+ arv_node = testutil.arvados_node_mock(1)
size = testutil.MockSize(1)
- self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
+ self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
self.assertEqual(1, self.alive_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
def test_shutdown_declined_below_min_nodes(self):
cloud_node = testutil.cloud_node_mock(1)
- self.make_daemon(cloud_nodes=[cloud_node], min_nodes=1)
+ arv_node = testutil.arvados_node_mock(1)
+ self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
self.assertEqual(1, self.alive_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
self.assertEqual(1, self.alive_monitor_count())
monitor = self.monitor_list()[0].proxy()
- shutdown_proxy = self.node_shutdown.start().proxy
- shutdown_proxy().cloud_node.get.return_value = cloud_node
- shutdown_proxy().success.get.return_value = False
- shutdown_proxy.reset_mock()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
- self.assertTrue(shutdown_proxy.called)
- self.daemon.node_finished_shutdown(shutdown_proxy()).get(self.TIMEOUT)
- shutdown_proxy().success.get.return_value = True
- shutdown_proxy.reset_mock()
+ self.last_shutdown.success.get.return_value = False
+ self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
+ self.assertEqual(1, self.alive_monitor_count())
+
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
- self.assertTrue(shutdown_proxy.called)
+ self.last_shutdown.success.get.return_value = True
+ self.last_shutdown.stop.side_effect = lambda: monitor.stop()
+ self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
+ self.assertEqual(0, self.alive_monitor_count())
def test_broken_node_blackholed_after_cancelled_shutdown(self):
- cloud_node = testutil.cloud_node_mock(8)
- wishlist = [testutil.MockSize(8)]
+ size = testutil.MockSize(8)
+ cloud_node = testutil.cloud_node_mock(8, size=size)
+ wishlist = [size]
self.make_daemon([cloud_node], [testutil.arvados_node_mock(8)],
- wishlist)
+ wishlist, avail_sizes=[(size, {"cores":1})])
self.assertEqual(1, self.alive_monitor_count())
self.assertFalse(self.node_setup.start.called)
monitor = self.monitor_list()[0].proxy()
self.assertEqual(1, self.node_setup.start.call_count)
def test_nodes_shutting_down_replaced_below_max_nodes(self):
- cloud_node = testutil.cloud_node_mock(6)
- self.make_daemon([cloud_node], [testutil.arvados_node_mock(6)])
+ size = testutil.MockSize(6)
+ cloud_node = testutil.cloud_node_mock(6, size=size)
+ self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
+ avail_sizes=[(size, {"cores":1})])
self.assertEqual(1, self.alive_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.assertFalse(self.node_setup.start.called)
def test_nodes_shutting_down_count_against_excess(self):
- cloud_nodes = [testutil.cloud_node_mock(n) for n in [8, 9]]
- arv_nodes = [testutil.arvados_node_mock(n) for n in [8, 9]]
- self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(8)])
+ size = testutil.MockSize(8)
+ cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [8, 9]]
+ arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
+ self.make_daemon(cloud_nodes, arv_nodes, [size],
+ avail_sizes=[(size, {"cores":1})])
self.assertEqual(2, self.alive_monitor_count())
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
self.assertEqual(
- 1, self.node_shutdown.start().proxy().stop().get.call_count)
+ 1, self.last_shutdown.stop.call_count)
def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
# We're mainly testing that update_cloud_nodes catches and handles
# the ActorDeadError.
- stop_method = self.node_shutdown.start().proxy().stop().get
- stop_method.side_effect = pykka.ActorDeadError
+ self.last_shutdown.stop.side_effect = pykka.ActorDeadError
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
- self.assertEqual(1, stop_method.call_count)
+ self.assertEqual(1, self.last_shutdown.stop.call_count)
+
+ def test_node_create_two_sizes(self):
+ small = testutil.MockSize(1)
+ big = testutil.MockSize(2)
+ avail_sizes = [(testutil.MockSize(1), {"cores":1}),
+ (testutil.MockSize(2), {"cores":2})]
+ self.make_daemon(want_sizes=[small, small, small, big],
+ avail_sizes=avail_sizes, max_nodes=4)
+
+ # the daemon runs in another thread, so we need to wait and see
+ # if it does all the work we're expecting it to do before stopping it.
+ self.busywait(lambda: self.node_setup.start.call_count == 4)
+ booting = self.daemon.booting.get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ sizecounts = {a[0].id: 0 for a in avail_sizes}
+ for b in booting.itervalues():
+ sizecounts[b.cloud_size.get().id] += 1
+ logging.info(sizecounts)
+ self.assertEqual(3, sizecounts[small.id])
+ self.assertEqual(1, sizecounts[big.id])
+
+ def test_node_max_nodes_two_sizes(self):
+ small = testutil.MockSize(1)
+ big = testutil.MockSize(2)
+ avail_sizes = [(testutil.MockSize(1), {"cores":1}),
+ (testutil.MockSize(2), {"cores":2})]
+ self.make_daemon(want_sizes=[small, small, small, big],
+ avail_sizes=avail_sizes, max_nodes=3)
+
+ # the daemon runs in another thread, so we need to wait and see
+ # if it does all the work we're expecting it to do before stopping it.
+ self.busywait(lambda: self.node_setup.start.call_count == 3)
+ booting = self.daemon.booting.get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ sizecounts = {a[0].id: 0 for a in avail_sizes}
+ for b in booting.itervalues():
+ sizecounts[b.cloud_size.get().id] += 1
+ self.assertEqual(2, sizecounts[small.id])
+ self.assertEqual(1, sizecounts[big.id])
+
+ def test_wishlist_reconfigure(self):
+ small = testutil.MockSize(1)
+ big = testutil.MockSize(2)
+ avail_sizes = [(small, {"cores":1}), (big, {"cores":2})]
+
+ self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, small),
+ testutil.cloud_node_mock(2, small),
+ testutil.cloud_node_mock(3, big)],
+ arvados_nodes=[testutil.arvados_node_mock(1),
+ testutil.arvados_node_mock(2),
+ testutil.arvados_node_mock(3)],
+ want_sizes=[small, small, big],
+ avail_sizes=avail_sizes)
+
+ self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
+
+ self.assertEqual(0, self.node_shutdown.start.call_count)
+
+ for c in self.daemon.cloud_nodes.get().nodes.itervalues():
+ self.daemon.node_can_shutdown(c.actor)
+
+ booting = self.daemon.booting.get()
+ cloud_nodes = self.daemon.cloud_nodes.get()
+
+ self.stop_proxy(self.daemon)
+
+ self.assertEqual(1, self.node_setup.start.call_count)
+ self.assertEqual(1, self.node_shutdown.start.call_count)
+
+ # booting a new big node
+ sizecounts = {a[0].id: 0 for a in avail_sizes}
+ for b in booting.itervalues():
+ sizecounts[b.cloud_size.get().id] += 1
+ self.assertEqual(0, sizecounts[small.id])
+ self.assertEqual(1, sizecounts[big.id])
+
+ # shutting down a small node
+ sizecounts = {a[0].id: 0 for a in avail_sizes}
+ for b in cloud_nodes.nodes.itervalues():
+ if b.shutdown_actor is not None:
+ sizecounts[b.cloud_node.size.id] += 1
+ self.assertEqual(1, sizecounts[small.id])
+ self.assertEqual(0, sizecounts[big.id])
+
+ def test_node_max_price(self):
+ small = testutil.MockSize(1)
+ big = testutil.MockSize(2)
+ avail_sizes = [(testutil.MockSize(1), {"cores":1, "price":1}),
+ (testutil.MockSize(2), {"cores":2, "price":2})]
+ self.make_daemon(want_sizes=[small, small, small, big],
+ avail_sizes=avail_sizes,
+ max_nodes=4,
+ max_total_price=4)
+ # the daemon runs in another thread, so we need to wait and see
+ # if it does all the work we're expecting it to do before stopping it.
+ self.busywait(lambda: self.node_setup.start.call_count == 3)
+ booting = self.daemon.booting.get()
+ self.stop_proxy(self.daemon)
+
+ sizecounts = {a[0].id: 0 for a in avail_sizes}
+ for b in booting.itervalues():
+ sizecounts[b.cloud_size.get().id] += 1
+ logging.info(sizecounts)
+
+ # Booting 3 small nodes and not booting a big node would also partially
+ # satisfy the wishlist and come in under the price cap, however the way
+ # the update_server_wishlist() currently works effectively results in a
+ # round-robin creation of one node of each size in the wishlist, so
+ # test for that.
+ self.assertEqual(2, sizecounts[small.id])
+ self.assertEqual(1, sizecounts[big.id])