4139: Speed up Node Manager tests.
authorBrett Smith <brett@curoverse.com>
Mon, 13 Oct 2014 19:19:15 +0000 (15:19 -0400)
committerBrett Smith <brett@curoverse.com>
Tue, 14 Oct 2014 14:19:36 +0000 (10:19 -0400)
Previously, the tests would poll interesting mocks, waiting for them
to be called.  This introduces significant overhead to the tests, and
they would frequently time out on Jenkins.  This modifies the tests to
get more information by blocking on the tested actors, which means
more predictability and less fighting for CPU (typical runtimes for
all the tests improved from 5 seconds to 0.5 seconds on my
workstation).

The downside to this approach is that it ties the tests more tightly
to the underlying actors' implementation.  In particular, they
sometimes send a message and block for a response to ensure that any
internal messages generated by the *last* message have been handled.
This is less than ideal, but I don't have a better idea right now.

services/nodemanager/tests/__init__.py
services/nodemanager/tests/test_clientactor.py
services/nodemanager/tests/test_computenode.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_jobqueue.py
services/nodemanager/tests/test_nodelist.py
services/nodemanager/tests/test_timedcallback.py
services/nodemanager/tests/testutil.py

index d374840eafa870c09bc839dcf6495665509e8416..c5eaf7636c7082bb08a8c78fd1485a71a62ace1c 100644 (file)
@@ -7,10 +7,9 @@ import os
 loglevel = os.environ.get('ANMTEST_LOGLEVEL', 'CRITICAL')
 logging.basicConfig(level=getattr(logging, loglevel.upper()))
 
-# Many tests wait for an actor to call a mock method.  They poll very
-# regularly (see wait_for_call in ActorTestMixin), but if you've
-# broken something, a long timeout can mean you'll spend a lot of time
-# watching failures come in.  You can set the ANMTEST_TIMEOUT
-# environment variable to arrange a shorter timeout while you're doing
-# regular development.
+# Set the ANM_TIMEOUT environment variable to the maximum amount of time to
+# wait for tested actors to respond to important messages.  The default value
+# is very conservative, because a small value may produce false negatives on
+# slower systems.  If you're debugging a known timeout issue, however, you may
+# want to set this lower to speed up tests.
 pykka_timeout = int(os.environ.get('ANMTEST_TIMEOUT', '10'))
index 0db0a33e3d83a203c103bc2a31e806f6a819691e..1e4c40ec6b94227e868880c87b8f9973aa63312d 100644 (file)
@@ -30,26 +30,27 @@ class RemotePollLoopActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
     def test_poll_loop_starts_after_subscription(self):
         self.build_monitor(['test1'])
-        self.monitor.subscribe(self.subscriber)
-        self.wait_for_call(self.subscriber)
+        self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.monitor)
         self.subscriber.assert_called_with('test1')
-        self.wait_for_call(self.timer.schedule)
+        self.assertTrue(self.timer.schedule.called)
 
     def test_poll_loop_continues_after_failure(self):
         self.build_monitor(self.MockClientError)
-        self.monitor.subscribe(self.subscriber)
-        self.wait_for_call(self.timer.schedule)
-        self.assertTrue(self.monitor.actor_ref.is_alive(),
+        self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+        self.assertTrue(self.stop_proxy(self.monitor),
                         "poll loop died after error")
+        self.assertTrue(self.timer.schedule.called,
+                        "poll loop did not reschedule after error")
         self.assertFalse(self.subscriber.called,
                          "poll loop notified subscribers after error")
 
     def test_late_subscribers_get_responses(self):
-        self.build_monitor(['late_test'])
-        self.monitor.subscribe(lambda response: None)
+        self.build_monitor(['pre_late_test', 'late_test'])
+        self.monitor.subscribe(lambda response: None).get(self.TIMEOUT)
         self.monitor.subscribe(self.subscriber)
-        self.monitor.poll()
-        self.wait_for_call(self.subscriber)
+        self.monitor.poll().get(self.TIMEOUT)
+        self.stop_proxy(self.monitor)
         self.subscriber.assert_called_with('late_test')
 
     def test_survive_dead_subscriptions(self):
@@ -57,13 +58,11 @@ class RemotePollLoopActorTestCase(testutil.RemotePollLoopActorTestMixin,
         dead_subscriber = mock.Mock(name='dead_subscriber')
         dead_subscriber.side_effect = pykka.ActorDeadError
         self.monitor.subscribe(dead_subscriber)
-        self.wait_for_call(dead_subscriber)
         self.monitor.subscribe(self.subscriber)
-        self.monitor.poll()
-        self.wait_for_call(self.subscriber)
-        self.subscriber.assert_called_with('survive2')
-        self.assertTrue(self.monitor.actor_ref.is_alive(),
+        self.monitor.poll().get(self.TIMEOUT)
+        self.assertTrue(self.stop_proxy(self.monitor),
                         "poll loop died from dead subscriber")
+        self.subscriber.assert_called_with('survive2')
 
     def test_no_subscriptions_by_key_without_support(self):
         self.build_monitor([])
@@ -86,8 +85,8 @@ class RemotePollLoopActorWithKeysTestCase(testutil.RemotePollLoopActorTestMixin,
 
     def test_key_subscription(self):
         self.build_monitor([[{'key': 1}, {'key': 2}]])
-        self.monitor.subscribe_to(2, self.subscriber)
-        self.wait_for_call(self.subscriber)
+        self.monitor.subscribe_to(2, self.subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.monitor)
         self.subscriber.assert_called_with({'key': 2})
 
     def test_survive_dead_key_subscriptions(self):
@@ -96,13 +95,11 @@ class RemotePollLoopActorWithKeysTestCase(testutil.RemotePollLoopActorTestMixin,
         dead_subscriber = mock.Mock(name='dead_subscriber')
         dead_subscriber.side_effect = pykka.ActorDeadError
         self.monitor.subscribe_to(3, dead_subscriber)
-        self.wait_for_call(dead_subscriber)
         self.monitor.subscribe_to(3, self.subscriber)
-        self.monitor.poll()
-        self.wait_for_call(self.subscriber)
-        self.subscriber.assert_called_with(item)
-        self.assertTrue(self.monitor.actor_ref.is_alive(),
+        self.monitor.poll().get(self.TIMEOUT)
+        self.assertTrue(self.stop_proxy(self.monitor),
                         "poll loop died from dead key subscriber")
+        self.subscriber.assert_called_with(item)
 
     def test_mixed_subscriptions(self):
         item = {'key': 4}
@@ -110,15 +107,15 @@ class RemotePollLoopActorWithKeysTestCase(testutil.RemotePollLoopActorTestMixin,
         key_subscriber = mock.Mock(name='key_subscriber')
         self.monitor.subscribe(self.subscriber)
         self.monitor.subscribe_to(4, key_subscriber)
-        self.monitor.poll()
-        self.wait_for_call(self.subscriber)
+        self.monitor.poll().get(self.TIMEOUT)
+        self.stop_proxy(self.monitor)
         self.subscriber.assert_called_with([item])
         key_subscriber.assert_called_with(item)
 
     def test_subscription_to_missing_key(self):
         self.build_monitor([[]])
-        self.monitor.subscribe_to('nonesuch', self.subscriber)
-        self.wait_for_call(self.subscriber)
+        self.monitor.subscribe_to('nonesuch', self.subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.monitor)
         self.subscriber.assert_called_with(None)
 
 
index 2fc7a504f42e423980debbb7efc8a634ec3ef3d8..477e20eb5741e45772f30a9afa2dab589080e61a 100644 (file)
@@ -16,7 +16,8 @@ from . import testutil
 class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
     def make_mocks(self, arvados_effect=None, cloud_effect=None):
         if arvados_effect is None:
-            arvados_effect = testutil.arvados_node_mock()
+            arvados_effect = [testutil.arvados_node_mock()]
+        self.arvados_effect = arvados_effect
         self.timer = testutil.MockTimer()
         self.api_client = mock.MagicMock(name='api_client')
         self.api_client.nodes().create().execute.side_effect = arvados_effect
@@ -26,22 +27,26 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
 
     def make_actor(self, arv_node=None):
         if not hasattr(self, 'timer'):
-            self.make_mocks()
+            self.make_mocks(arvados_effect=[arv_node])
         self.setup_actor = cnode.ComputeNodeSetupActor.start(
             self.timer, self.api_client, self.cloud_client,
             testutil.MockSize(1), arv_node).proxy()
 
     def test_creation_without_arvados_node(self):
         self.make_actor()
-        self.wait_for_call(self.api_client.nodes().create().execute)
-        self.wait_for_call(self.cloud_client.create_node)
+        self.assertEqual(self.arvados_effect[-1],
+                         self.setup_actor.arvados_node.get(self.TIMEOUT))
+        self.assertTrue(self.api_client.nodes().create().execute.called)
+        self.assertEqual(self.cloud_client.create_node(),
+                         self.setup_actor.cloud_node.get(self.TIMEOUT))
 
     def test_creation_with_arvados_node(self):
-        arv_node = testutil.arvados_node_mock()
-        self.make_mocks([arv_node])
-        self.make_actor(arv_node)
-        self.wait_for_call(self.api_client.nodes().update().execute)
-        self.wait_for_call(self.cloud_client.create_node)
+        self.make_actor(testutil.arvados_node_mock())
+        self.assertEqual(self.arvados_effect[-1],
+                         self.setup_actor.arvados_node.get(self.TIMEOUT))
+        self.assertTrue(self.api_client.nodes().update().execute.called)
+        self.assertEqual(self.cloud_client.create_node(),
+                         self.setup_actor.cloud_node.get(self.TIMEOUT))
 
     def test_failed_calls_retried(self):
         self.make_mocks([
@@ -49,22 +54,22 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
                 testutil.arvados_node_mock(),
                 ])
         self.make_actor()
-        self.wait_for_call(self.cloud_client.create_node)
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
 
     def test_stop_when_no_cloud_node(self):
         self.make_mocks(
             arverror.ApiError(httplib2.Response({'status': '500'}), ""))
         self.make_actor()
-        self.wait_for_call(self.api_client.nodes().create().execute)
         self.setup_actor.stop_if_no_cloud_node()
         self.assertTrue(
             self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
 
     def test_no_stop_when_cloud_node(self):
         self.make_actor()
-        self.wait_for_call(self.cloud_client.create_node)
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
         self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
-        self.assertFalse(self.setup_actor.actor_ref.actor_stopped.is_set())
+        self.assertTrue(self.stop_proxy(self.setup_actor),
+                        "actor was stopped by stop_if_no_cloud_node")
 
     def test_subscribe(self):
         self.make_mocks(
@@ -74,16 +79,16 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
         self.setup_actor.subscribe(subscriber)
         self.api_client.nodes().create().execute.side_effect = [
             testutil.arvados_node_mock()]
-        self.wait_for_call(subscriber)
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
         self.assertEqual(self.setup_actor.actor_ref.actor_urn,
                          subscriber.call_args[0][0].actor_ref.actor_urn)
 
     def test_late_subscribe(self):
         self.make_actor()
         subscriber = mock.Mock(name='subscriber_mock')
-        self.wait_for_call(self.cloud_client.create_node)
-        self.setup_actor.subscribe(subscriber)
-        self.wait_for_call(subscriber)
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        self.setup_actor.subscribe(subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.setup_actor)
         self.assertEqual(self.setup_actor.actor_ref.actor_urn,
                          subscriber.call_args[0][0].actor_ref.actor_urn)
 
@@ -105,7 +110,9 @@ class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
 
     def test_easy_shutdown(self):
         self.make_actor()
-        self.wait_for_call(self.cloud_client.destroy_node)
+        self.shutdown_actor.cloud_node.get(self.TIMEOUT)
+        self.stop_proxy(self.shutdown_actor)
+        self.assertTrue(self.cloud_client.destroy_node.called)
 
 
 class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
@@ -168,43 +175,44 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
             self.make_mocks(node_num)
         if start_time is None:
             start_time = time.time()
-        start_time = time.time()
         self.node_actor = cnode.ComputeNodeMonitorActor.start(
             self.cloud_mock, start_time, self.shutdowns, self.timer,
             self.updates, arv_node).proxy()
-        self.node_actor.subscribe(self.subscriber)
+        self.subscription = self.node_actor.subscribe(self.subscriber)
 
     def test_init_shutdown_scheduling(self):
         self.make_actor()
-        self.wait_for_call(self.timer.schedule)
+        self.subscription.get(self.TIMEOUT)
+        self.assertTrue(self.timer.schedule.called)
         self.assertEqual(300, self.timer.schedule.call_args[0][0])
 
     def test_shutdown_subscription(self):
         self.make_actor()
         self.shutdowns._set_state(True, 600)
-        self.node_actor.consider_shutdown()
-        self.wait_for_call(self.subscriber)
+        self.node_actor.consider_shutdown().get(self.TIMEOUT)
+        self.assertTrue(self.subscriber.called)
         self.assertEqual(self.node_actor.actor_ref.actor_urn,
                          self.subscriber.call_args[0][0].actor_ref.actor_urn)
 
     def test_shutdown_without_arvados_node(self):
         self.make_actor()
         self.shutdowns._set_state(True, 600)
-        self.node_actor.consider_shutdown()
-        self.wait_for_call(self.subscriber)
+        self.node_actor.consider_shutdown().get(self.TIMEOUT)
+        self.assertTrue(self.subscriber.called)
 
     def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
         self.make_actor(start_time=0)
         self.shutdowns._set_state(True, 600)
-        self.node_actor.consider_shutdown()
+        self.node_actor.consider_shutdown().get(self.TIMEOUT)
         self.assertFalse(self.subscriber.called)
 
     def check_shutdown_rescheduled(self, window_open, next_window,
                                    schedule_time=None):
         self.shutdowns._set_state(window_open, next_window)
         self.timer.schedule.reset_mock()
-        self.node_actor.consider_shutdown()
-        self.wait_for_call(self.timer.schedule)
+        self.node_actor.consider_shutdown().get(self.TIMEOUT)
+        self.stop_proxy(self.node_actor)
+        self.assertTrue(self.timer.schedule.called)
         if schedule_time is not None:
             self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
         self.assertFalse(self.subscriber.called)
@@ -229,16 +237,16 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
         self.make_actor(2)
         arv_node = testutil.arvados_node_mock(
             2, hostname='compute-two.zzzzz.arvadosapi.com')
-        pair_future = self.node_actor.offer_arvados_pair(arv_node)
-        self.assertEqual(self.cloud_mock.id, pair_future.get(self.TIMEOUT))
-        self.wait_for_call(self.updates.sync_node)
+        pair_id = self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT)
+        self.assertEqual(self.cloud_mock.id, pair_id)
+        self.stop_proxy(self.node_actor)
         self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node)
 
     def test_arvados_node_mismatch(self):
         self.make_actor(3)
         arv_node = testutil.arvados_node_mock(1)
-        pair_future = self.node_actor.offer_arvados_pair(arv_node)
-        self.assertIsNone(pair_future.get(self.TIMEOUT))
+        self.assertIsNone(
+            self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT))
 
     def test_update_cloud_node(self):
         self.make_actor(1)
index 176b096714cb440d542f71416145731598518908..0a63222afcb53ad781b7a208f0b92498a2a3ba19 100644 (file)
@@ -30,25 +30,25 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
             [54, 5, 1], 8, 600, 3600,
             self.node_setup, self.node_shutdown, self.node_factory).proxy()
         if cloud_nodes is not None:
-            self.daemon.update_cloud_nodes(cloud_nodes)
+            self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
         if arvados_nodes is not None:
-            self.daemon.update_arvados_nodes(arvados_nodes)
+            self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
         if want_sizes is not None:
-            self.daemon.update_server_wishlist(want_sizes)
+            self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
 
     def test_easy_node_creation(self):
         size = testutil.MockSize(1)
         self.make_daemon(want_sizes=[size])
-        self.wait_for_call(self.node_setup.start)
+        self.stop_proxy(self.daemon)
+        self.assertTrue(self.node_setup.start.called)
 
     def test_node_pairing(self):
         cloud_node = testutil.cloud_node_mock(1)
         arv_node = testutil.arvados_node_mock(1)
         self.make_daemon([cloud_node], [arv_node])
-        self.wait_for_call(self.node_factory.start)
-        pair_func = self.node_factory.start().proxy().offer_arvados_pair
-        self.wait_for_call(pair_func)
-        pair_func.assert_called_with(arv_node)
+        self.stop_proxy(self.daemon)
+        self.node_factory.start().proxy().offer_arvados_pair.assert_called_with(
+            arv_node)
 
     def test_node_pairing_after_arvados_update(self):
         cloud_node = testutil.cloud_node_mock(2)
@@ -68,21 +68,25 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_old_arvados_node_not_double_assigned(self):
         arv_node = testutil.arvados_node_mock(3, age=9000)
         size = testutil.MockSize(3)
-        self.make_daemon(arvados_nodes=[arv_node], want_sizes=[size, size])
-        node_starter = self.node_setup.start
-        deadline = time.time() + self.TIMEOUT
-        while (time.time() < deadline) and (node_starter.call_count < 2):
-            time.sleep(.1)
-        self.assertEqual(2, node_starter.call_count)
+        self.make_daemon(arvados_nodes=[arv_node])
+        setup_ref = self.node_setup.start().proxy().actor_ref
+        setup_ref.actor_urn = 0
+        self.node_setup.start.reset_mock()
+        self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
+        self.daemon.max_nodes.get(self.TIMEOUT)
+        setup_ref.actor_urn += 1
+        self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
         used_nodes = [call[1].get('arvados_node')
-                      for call in node_starter.call_args_list]
+                      for call in self.node_setup.start.call_args_list]
+        self.assertEqual(2, len(used_nodes))
         self.assertIn(arv_node, used_nodes)
         self.assertIn(None, used_nodes)
 
     def test_node_count_satisfied(self):
-        self.make_daemon([testutil.cloud_node_mock()])
-        self.daemon.update_server_wishlist(
-            [testutil.MockSize(1)]).get(self.TIMEOUT)
+        self.make_daemon([testutil.cloud_node_mock()],
+                         want_sizes=[testutil.MockSize(1)])
+        self.stop_proxy(self.daemon)
         self.assertFalse(self.node_setup.called)
 
     def test_booting_nodes_counted(self):
@@ -90,10 +94,11 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_node = testutil.arvados_node_mock(1)
         server_wishlist = [testutil.MockSize(1)] * 2
         self.make_daemon([cloud_node], [arv_node], server_wishlist)
-        self.wait_for_call(self.node_setup.start)
-        self.node_setup.reset_mock()
+        self.daemon.max_nodes.get(self.TIMEOUT)
+        self.assertTrue(self.node_setup.start.called)
         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
-        self.assertFalse(self.node_setup.called)
+        self.stop_proxy(self.daemon)
+        self.assertEqual(1, self.node_setup.start.call_count)
 
     def test_no_duplication_when_booting_node_listed_fast(self):
         # Test that we don't start two ComputeNodeMonitorActors when
@@ -101,23 +106,23 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         # get the "node up" message from CloudNodeSetupActor.
         cloud_node = testutil.cloud_node_mock(1)
         self.make_daemon(want_sizes=[testutil.MockSize(1)])
-        self.wait_for_call(self.node_setup.start)
+        self.daemon.max_nodes.get(self.TIMEOUT)
+        self.assertEqual(1, self.node_setup.start.call_count)
         setup = mock.MagicMock(name='setup_node_mock')
         setup.actor_ref = self.node_setup.start().proxy().actor_ref
         setup.cloud_node.get.return_value = cloud_node
         setup.arvados_node.get.return_value = testutil.arvados_node_mock(1)
-        self.daemon.update_cloud_nodes([cloud_node])
-        self.wait_for_call(self.node_factory.start)
-        self.node_factory.reset_mock()
+        self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
+        self.assertTrue(self.node_factory.start.called)
         self.daemon.node_up(setup).get(self.TIMEOUT)
-        self.assertFalse(self.node_factory.start.called)
+        self.assertEqual(1, self.node_factory.start.call_count)
 
     def test_booting_nodes_shut_down(self):
         self.make_daemon(want_sizes=[testutil.MockSize(1)])
-        self.wait_for_call(self.node_setup.start)
-        self.daemon.update_server_wishlist([])
-        self.wait_for_call(
-            self.node_setup.start().proxy().stop_if_no_cloud_node)
+        self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        self.assertTrue(
+            self.node_setup.start().proxy().stop_if_no_cloud_node.called)
 
     def test_shutdown_declined_at_wishlist_capacity(self):
         cloud_node = testutil.cloud_node_mock(1)
@@ -125,34 +130,32 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
         node_actor = self.node_factory().proxy()
         self.daemon.node_can_shutdown(node_actor).get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
         self.assertFalse(node_actor.shutdown.called)
 
     def test_shutdown_accepted_below_capacity(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
         node_actor = self.node_factory().proxy()
-        self.daemon.node_can_shutdown(node_actor)
-        self.wait_for_call(self.node_shutdown.start)
+        self.daemon.node_can_shutdown(node_actor).get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        self.assertTrue(self.node_shutdown.start.called)
 
     def test_clean_shutdown_waits_for_node_setup_finish(self):
         self.make_daemon(want_sizes=[testutil.MockSize(1)])
-        self.wait_for_call(self.node_setup.start)
+        self.daemon.max_nodes.get(self.TIMEOUT)
+        self.assertTrue(self.node_setup.start.called)
         new_node = self.node_setup.start().proxy()
-        self.daemon.shutdown()
-        self.wait_for_call(new_node.stop_if_no_cloud_node)
-        self.daemon.node_up(new_node)
-        self.wait_for_call(new_node.stop)
+        self.daemon.shutdown().get(self.TIMEOUT)
+        self.assertTrue(new_node.stop_if_no_cloud_node.called)
+        self.daemon.node_up(new_node).get(self.TIMEOUT)
+        self.assertTrue(new_node.stop.called)
         self.assertTrue(
             self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
 
     def test_wishlist_ignored_after_shutdown(self):
         size = testutil.MockSize(2)
         self.make_daemon(want_sizes=[size])
-        node_starter = self.node_setup.start
-        self.wait_for_call(node_starter)
-        node_starter.reset_mock()
-        self.daemon.shutdown()
-        self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
-        # Send another message and wait for a response, to make sure all
-        # internal messages generated by the wishlist update are processed.
+        self.daemon.shutdown().get(self.TIMEOUT)
         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
-        self.assertFalse(node_starter.called)
+        self.stop_proxy(self.daemon)
+        self.assertEqual(1, self.node_setup.start.call_count)
index 3814ba4610a3db416c01d5e3ddb2c5d31de435b2..b27f69f15c2431493a49ecb5daf09f50ea710187 100644 (file)
@@ -63,8 +63,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
     def test_subscribers_get_server_lists(self):
         self.build_monitor([{'items': [1, 2]}], self.MockCalculator())
-        self.monitor.subscribe(self.subscriber)
-        self.wait_for_call(self.subscriber)
+        self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.monitor)
         self.subscriber.assert_called_with([testutil.MockSize(1),
                                             testutil.MockSize(2)])
 
index d9f47e2605286bc00ffa8b51e26092d764c8e8e2..5346e7ab7bbba0a6ae8e353e566454134e2617da 100644 (file)
@@ -19,8 +19,9 @@ class ArvadosNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
     def test_uuid_is_subscription_key(self):
         node = testutil.arvados_node_mock()
         self.build_monitor([{'items': [node]}])
-        self.monitor.subscribe_to(node['uuid'], self.subscriber)
-        self.wait_for_call(self.subscriber)
+        self.monitor.subscribe_to(node['uuid'],
+                                  self.subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.monitor)
         self.subscriber.assert_called_with(node)
 
 
@@ -46,8 +47,8 @@ class CloudNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
     def test_id_is_subscription_key(self):
         node = self.MockNode(1)
         self.build_monitor([[node]])
-        self.monitor.subscribe_to('1', self.subscriber)
-        self.wait_for_call(self.subscriber)
+        self.monitor.subscribe_to('1', self.subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.monitor)
         self.subscriber.assert_called_with(node)
 
 
index 60f7b81bade33c22306a5c9239cb52bf67b595b1..1d1e6c3b116634ccae6442cb1e37f925740c0ee6 100644 (file)
@@ -14,40 +14,46 @@ from . import testutil
 @testutil.no_sleep
 class TimedCallBackActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
     def test_immediate_turnaround(self):
-        future = self.FUTURE_CLASS()
+        receiver = mock.Mock()
         deliverer = timedcallback.TimedCallBackActor.start().proxy()
-        deliverer.schedule(time.time() - 1, future.set, 'immediate')
-        self.assertEqual('immediate', future.get(self.TIMEOUT))
+        deliverer.schedule(time.time() - 1, receiver,
+                           'immediate').get(self.TIMEOUT)
+        self.stop_proxy(deliverer)
+        receiver.assert_called_with('immediate')
 
     def test_delayed_turnaround(self):
-        future = self.FUTURE_CLASS()
+        receiver = mock.Mock()
         with mock.patch('time.time', return_value=0) as mock_now:
             deliverer = timedcallback.TimedCallBackActor.start().proxy()
-            deliverer.schedule(1, future.set, 'delayed')
-            self.assertRaises(pykka.Timeout, future.get, .5)
+            deliverer.schedule(1, receiver, 'delayed')
+            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+            self.assertFalse(receiver.called)
             mock_now.return_value = 2
-            self.assertEqual('delayed', future.get(self.TIMEOUT))
+            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+            self.stop_proxy(deliverer)
+        receiver.assert_called_with('delayed')
 
     def test_out_of_order_scheduling(self):
-        future1 = self.FUTURE_CLASS()
-        future2 = self.FUTURE_CLASS()
+        receiver = mock.Mock()
         with mock.patch('time.time', return_value=1.5) as mock_now:
             deliverer = timedcallback.TimedCallBackActor.start().proxy()
-            deliverer.schedule(2, future2.set, 'second')
-            deliverer.schedule(1, future1.set, 'first')
-            self.assertEqual('first', future1.get(self.TIMEOUT))
-            self.assertRaises(pykka.Timeout, future2.get, .1)
-            mock_now.return_value = 3
-            self.assertEqual('second', future2.get(self.TIMEOUT))
+            deliverer.schedule(2, receiver, 'second')
+            deliverer.schedule(1, receiver, 'first')
+            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+            receiver.assert_called_with('first')
+            mock_now.return_value = 2.5
+            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+            self.stop_proxy(deliverer)
+        receiver.assert_called_with('second')
 
     def test_dead_actors_ignored(self):
         receiver = mock.Mock(name='dead_actor', spec=pykka.ActorRef)
         receiver.tell.side_effect = pykka.ActorDeadError
         deliverer = timedcallback.TimedCallBackActor.start().proxy()
-        deliverer.schedule(time.time() - 1, receiver.tell, 'error')
-        self.wait_for_call(receiver.tell)
+        deliverer.schedule(time.time() - 1, receiver.tell,
+                           'error').get(self.TIMEOUT)
+        self.assertTrue(self.stop_proxy(deliverer), "deliverer died")
         receiver.tell.assert_called_with('error')
-        self.assertTrue(deliverer.actor_ref.is_alive(), "deliverer died")
 
 
 if __name__ == '__main__':
index 7d6549decd1943189ce8491024924bfcfc085e1e..0c63db32b9ca6b3ca2aef249cbd2db6088d66d2d 100644 (file)
@@ -67,11 +67,19 @@ class ActorTestMixin(object):
     def tearDown(self):
         pykka.ActorRegistry.stop_all()
 
-    def wait_for_call(self, mock_func, timeout=TIMEOUT):
+    def stop_proxy(self, proxy):
+        return proxy.actor_ref.stop(timeout=self.TIMEOUT)
+
+    def wait_for_assignment(self, proxy, attr_name, unassigned=None,
+                            timeout=TIMEOUT):
         deadline = time.time() + timeout
-        while (not mock_func.called) and (time.time() < deadline):
-            time.sleep(.1)
-        self.assertTrue(mock_func.called, "{} not called".format(mock_func))
+        while True:
+            loop_timeout = deadline - time.time()
+            if loop_timeout <= 0:
+                self.fail("actor did not assign {} in time".format(attr_name))
+            result = getattr(proxy, attr_name).get(loop_timeout)
+            if result is not unassigned:
+                return result
 
 
 class RemotePollLoopActorTestMixin(ActorTestMixin):