loglevel = os.environ.get('ANMTEST_LOGLEVEL', 'CRITICAL')
logging.basicConfig(level=getattr(logging, loglevel.upper()))
-# Many tests wait for an actor to call a mock method. They poll very
-# regularly (see wait_for_call in ActorTestMixin), but if you've
-# broken something, a long timeout can mean you'll spend a lot of time
-# watching failures come in. You can set the ANMTEST_TIMEOUT
-# environment variable to arrange a shorter timeout while you're doing
-# regular development.
+# Set the ANM_TIMEOUT environment variable to the maximum amount of time to
+# wait for tested actors to respond to important messages. The default value
+# is very conservative, because a small value may produce false negatives on
+# slower systems. If you're debugging a known timeout issue, however, you may
+# want to set this lower to speed up tests.
pykka_timeout = int(os.environ.get('ANMTEST_TIMEOUT', '10'))
def test_poll_loop_starts_after_subscription(self):
self.build_monitor(['test1'])
- self.monitor.subscribe(self.subscriber)
- self.wait_for_call(self.subscriber)
+ self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.monitor)
self.subscriber.assert_called_with('test1')
- self.wait_for_call(self.timer.schedule)
+ self.assertTrue(self.timer.schedule.called)
def test_poll_loop_continues_after_failure(self):
self.build_monitor(self.MockClientError)
- self.monitor.subscribe(self.subscriber)
- self.wait_for_call(self.timer.schedule)
- self.assertTrue(self.monitor.actor_ref.is_alive(),
+ self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+ self.assertTrue(self.stop_proxy(self.monitor),
"poll loop died after error")
+ self.assertTrue(self.timer.schedule.called,
+ "poll loop did not reschedule after error")
self.assertFalse(self.subscriber.called,
"poll loop notified subscribers after error")
def test_late_subscribers_get_responses(self):
- self.build_monitor(['late_test'])
- self.monitor.subscribe(lambda response: None)
+ self.build_monitor(['pre_late_test', 'late_test'])
+ self.monitor.subscribe(lambda response: None).get(self.TIMEOUT)
self.monitor.subscribe(self.subscriber)
- self.monitor.poll()
- self.wait_for_call(self.subscriber)
+ self.monitor.poll().get(self.TIMEOUT)
+ self.stop_proxy(self.monitor)
self.subscriber.assert_called_with('late_test')
def test_survive_dead_subscriptions(self):
dead_subscriber = mock.Mock(name='dead_subscriber')
dead_subscriber.side_effect = pykka.ActorDeadError
self.monitor.subscribe(dead_subscriber)
- self.wait_for_call(dead_subscriber)
self.monitor.subscribe(self.subscriber)
- self.monitor.poll()
- self.wait_for_call(self.subscriber)
- self.subscriber.assert_called_with('survive2')
- self.assertTrue(self.monitor.actor_ref.is_alive(),
+ self.monitor.poll().get(self.TIMEOUT)
+ self.assertTrue(self.stop_proxy(self.monitor),
"poll loop died from dead subscriber")
+ self.subscriber.assert_called_with('survive2')
def test_no_subscriptions_by_key_without_support(self):
self.build_monitor([])
def test_key_subscription(self):
self.build_monitor([[{'key': 1}, {'key': 2}]])
- self.monitor.subscribe_to(2, self.subscriber)
- self.wait_for_call(self.subscriber)
+ self.monitor.subscribe_to(2, self.subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.monitor)
self.subscriber.assert_called_with({'key': 2})
def test_survive_dead_key_subscriptions(self):
dead_subscriber = mock.Mock(name='dead_subscriber')
dead_subscriber.side_effect = pykka.ActorDeadError
self.monitor.subscribe_to(3, dead_subscriber)
- self.wait_for_call(dead_subscriber)
self.monitor.subscribe_to(3, self.subscriber)
- self.monitor.poll()
- self.wait_for_call(self.subscriber)
- self.subscriber.assert_called_with(item)
- self.assertTrue(self.monitor.actor_ref.is_alive(),
+ self.monitor.poll().get(self.TIMEOUT)
+ self.assertTrue(self.stop_proxy(self.monitor),
"poll loop died from dead key subscriber")
+ self.subscriber.assert_called_with(item)
def test_mixed_subscriptions(self):
item = {'key': 4}
key_subscriber = mock.Mock(name='key_subscriber')
self.monitor.subscribe(self.subscriber)
self.monitor.subscribe_to(4, key_subscriber)
- self.monitor.poll()
- self.wait_for_call(self.subscriber)
+ self.monitor.poll().get(self.TIMEOUT)
+ self.stop_proxy(self.monitor)
self.subscriber.assert_called_with([item])
key_subscriber.assert_called_with(item)
def test_subscription_to_missing_key(self):
self.build_monitor([[]])
- self.monitor.subscribe_to('nonesuch', self.subscriber)
- self.wait_for_call(self.subscriber)
+ self.monitor.subscribe_to('nonesuch', self.subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.monitor)
self.subscriber.assert_called_with(None)
class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
def make_mocks(self, arvados_effect=None, cloud_effect=None):
if arvados_effect is None:
- arvados_effect = testutil.arvados_node_mock()
+ arvados_effect = [testutil.arvados_node_mock()]
+ self.arvados_effect = arvados_effect
self.timer = testutil.MockTimer()
self.api_client = mock.MagicMock(name='api_client')
self.api_client.nodes().create().execute.side_effect = arvados_effect
def make_actor(self, arv_node=None):
if not hasattr(self, 'timer'):
- self.make_mocks()
+ self.make_mocks(arvados_effect=[arv_node])
self.setup_actor = cnode.ComputeNodeSetupActor.start(
self.timer, self.api_client, self.cloud_client,
testutil.MockSize(1), arv_node).proxy()
def test_creation_without_arvados_node(self):
self.make_actor()
- self.wait_for_call(self.api_client.nodes().create().execute)
- self.wait_for_call(self.cloud_client.create_node)
+ self.assertEqual(self.arvados_effect[-1],
+ self.setup_actor.arvados_node.get(self.TIMEOUT))
+ self.assertTrue(self.api_client.nodes().create().execute.called)
+ self.assertEqual(self.cloud_client.create_node(),
+ self.setup_actor.cloud_node.get(self.TIMEOUT))
def test_creation_with_arvados_node(self):
- arv_node = testutil.arvados_node_mock()
- self.make_mocks([arv_node])
- self.make_actor(arv_node)
- self.wait_for_call(self.api_client.nodes().update().execute)
- self.wait_for_call(self.cloud_client.create_node)
+ self.make_actor(testutil.arvados_node_mock())
+ self.assertEqual(self.arvados_effect[-1],
+ self.setup_actor.arvados_node.get(self.TIMEOUT))
+ self.assertTrue(self.api_client.nodes().update().execute.called)
+ self.assertEqual(self.cloud_client.create_node(),
+ self.setup_actor.cloud_node.get(self.TIMEOUT))
def test_failed_calls_retried(self):
self.make_mocks([
testutil.arvados_node_mock(),
])
self.make_actor()
- self.wait_for_call(self.cloud_client.create_node)
+ self.wait_for_assignment(self.setup_actor, 'cloud_node')
def test_stop_when_no_cloud_node(self):
self.make_mocks(
arverror.ApiError(httplib2.Response({'status': '500'}), ""))
self.make_actor()
- self.wait_for_call(self.api_client.nodes().create().execute)
self.setup_actor.stop_if_no_cloud_node()
self.assertTrue(
self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
def test_no_stop_when_cloud_node(self):
self.make_actor()
- self.wait_for_call(self.cloud_client.create_node)
+ self.wait_for_assignment(self.setup_actor, 'cloud_node')
self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
- self.assertFalse(self.setup_actor.actor_ref.actor_stopped.is_set())
+ self.assertTrue(self.stop_proxy(self.setup_actor),
+ "actor was stopped by stop_if_no_cloud_node")
def test_subscribe(self):
self.make_mocks(
self.setup_actor.subscribe(subscriber)
self.api_client.nodes().create().execute.side_effect = [
testutil.arvados_node_mock()]
- self.wait_for_call(subscriber)
+ self.wait_for_assignment(self.setup_actor, 'cloud_node')
self.assertEqual(self.setup_actor.actor_ref.actor_urn,
subscriber.call_args[0][0].actor_ref.actor_urn)
def test_late_subscribe(self):
self.make_actor()
subscriber = mock.Mock(name='subscriber_mock')
- self.wait_for_call(self.cloud_client.create_node)
- self.setup_actor.subscribe(subscriber)
- self.wait_for_call(subscriber)
+ self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ self.setup_actor.subscribe(subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.setup_actor)
self.assertEqual(self.setup_actor.actor_ref.actor_urn,
subscriber.call_args[0][0].actor_ref.actor_urn)
def test_easy_shutdown(self):
self.make_actor()
- self.wait_for_call(self.cloud_client.destroy_node)
+ self.shutdown_actor.cloud_node.get(self.TIMEOUT)
+ self.stop_proxy(self.shutdown_actor)
+ self.assertTrue(self.cloud_client.destroy_node.called)
class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
self.make_mocks(node_num)
if start_time is None:
start_time = time.time()
- start_time = time.time()
self.node_actor = cnode.ComputeNodeMonitorActor.start(
self.cloud_mock, start_time, self.shutdowns, self.timer,
self.updates, arv_node).proxy()
- self.node_actor.subscribe(self.subscriber)
+ self.subscription = self.node_actor.subscribe(self.subscriber)
def test_init_shutdown_scheduling(self):
self.make_actor()
- self.wait_for_call(self.timer.schedule)
+ self.subscription.get(self.TIMEOUT)
+ self.assertTrue(self.timer.schedule.called)
self.assertEqual(300, self.timer.schedule.call_args[0][0])
def test_shutdown_subscription(self):
self.make_actor()
self.shutdowns._set_state(True, 600)
- self.node_actor.consider_shutdown()
- self.wait_for_call(self.subscriber)
+ self.node_actor.consider_shutdown().get(self.TIMEOUT)
+ self.assertTrue(self.subscriber.called)
self.assertEqual(self.node_actor.actor_ref.actor_urn,
self.subscriber.call_args[0][0].actor_ref.actor_urn)
def test_shutdown_without_arvados_node(self):
self.make_actor()
self.shutdowns._set_state(True, 600)
- self.node_actor.consider_shutdown()
- self.wait_for_call(self.subscriber)
+ self.node_actor.consider_shutdown().get(self.TIMEOUT)
+ self.assertTrue(self.subscriber.called)
def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
self.make_actor(start_time=0)
self.shutdowns._set_state(True, 600)
- self.node_actor.consider_shutdown()
+ self.node_actor.consider_shutdown().get(self.TIMEOUT)
self.assertFalse(self.subscriber.called)
def check_shutdown_rescheduled(self, window_open, next_window,
schedule_time=None):
self.shutdowns._set_state(window_open, next_window)
self.timer.schedule.reset_mock()
- self.node_actor.consider_shutdown()
- self.wait_for_call(self.timer.schedule)
+ self.node_actor.consider_shutdown().get(self.TIMEOUT)
+ self.stop_proxy(self.node_actor)
+ self.assertTrue(self.timer.schedule.called)
if schedule_time is not None:
self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
self.assertFalse(self.subscriber.called)
self.make_actor(2)
arv_node = testutil.arvados_node_mock(
2, hostname='compute-two.zzzzz.arvadosapi.com')
- pair_future = self.node_actor.offer_arvados_pair(arv_node)
- self.assertEqual(self.cloud_mock.id, pair_future.get(self.TIMEOUT))
- self.wait_for_call(self.updates.sync_node)
+ pair_id = self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT)
+ self.assertEqual(self.cloud_mock.id, pair_id)
+ self.stop_proxy(self.node_actor)
self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node)
def test_arvados_node_mismatch(self):
self.make_actor(3)
arv_node = testutil.arvados_node_mock(1)
- pair_future = self.node_actor.offer_arvados_pair(arv_node)
- self.assertIsNone(pair_future.get(self.TIMEOUT))
+ self.assertIsNone(
+ self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT))
def test_update_cloud_node(self):
self.make_actor(1)
[54, 5, 1], 8, 600, 3600,
self.node_setup, self.node_shutdown, self.node_factory).proxy()
if cloud_nodes is not None:
- self.daemon.update_cloud_nodes(cloud_nodes)
+ self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
if arvados_nodes is not None:
- self.daemon.update_arvados_nodes(arvados_nodes)
+ self.daemon.update_arvados_nodes(arvados_nodes).get(self.TIMEOUT)
if want_sizes is not None:
- self.daemon.update_server_wishlist(want_sizes)
+ self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
def test_easy_node_creation(self):
size = testutil.MockSize(1)
self.make_daemon(want_sizes=[size])
- self.wait_for_call(self.node_setup.start)
+ self.stop_proxy(self.daemon)
+ self.assertTrue(self.node_setup.start.called)
def test_node_pairing(self):
cloud_node = testutil.cloud_node_mock(1)
arv_node = testutil.arvados_node_mock(1)
self.make_daemon([cloud_node], [arv_node])
- self.wait_for_call(self.node_factory.start)
- pair_func = self.node_factory.start().proxy().offer_arvados_pair
- self.wait_for_call(pair_func)
- pair_func.assert_called_with(arv_node)
+ self.stop_proxy(self.daemon)
+ self.node_factory.start().proxy().offer_arvados_pair.assert_called_with(
+ arv_node)
def test_node_pairing_after_arvados_update(self):
cloud_node = testutil.cloud_node_mock(2)
def test_old_arvados_node_not_double_assigned(self):
arv_node = testutil.arvados_node_mock(3, age=9000)
size = testutil.MockSize(3)
- self.make_daemon(arvados_nodes=[arv_node], want_sizes=[size, size])
- node_starter = self.node_setup.start
- deadline = time.time() + self.TIMEOUT
- while (time.time() < deadline) and (node_starter.call_count < 2):
- time.sleep(.1)
- self.assertEqual(2, node_starter.call_count)
+ self.make_daemon(arvados_nodes=[arv_node])
+ setup_ref = self.node_setup.start().proxy().actor_ref
+ setup_ref.actor_urn = 0
+ self.node_setup.start.reset_mock()
+ self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
+ self.daemon.max_nodes.get(self.TIMEOUT)
+ setup_ref.actor_urn += 1
+ self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
used_nodes = [call[1].get('arvados_node')
- for call in node_starter.call_args_list]
+ for call in self.node_setup.start.call_args_list]
+ self.assertEqual(2, len(used_nodes))
self.assertIn(arv_node, used_nodes)
self.assertIn(None, used_nodes)
def test_node_count_satisfied(self):
- self.make_daemon([testutil.cloud_node_mock()])
- self.daemon.update_server_wishlist(
- [testutil.MockSize(1)]).get(self.TIMEOUT)
+ self.make_daemon([testutil.cloud_node_mock()],
+ want_sizes=[testutil.MockSize(1)])
+ self.stop_proxy(self.daemon)
self.assertFalse(self.node_setup.called)
def test_booting_nodes_counted(self):
arv_node = testutil.arvados_node_mock(1)
server_wishlist = [testutil.MockSize(1)] * 2
self.make_daemon([cloud_node], [arv_node], server_wishlist)
- self.wait_for_call(self.node_setup.start)
- self.node_setup.reset_mock()
+ self.daemon.max_nodes.get(self.TIMEOUT)
+ self.assertTrue(self.node_setup.start.called)
self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
- self.assertFalse(self.node_setup.called)
+ self.stop_proxy(self.daemon)
+ self.assertEqual(1, self.node_setup.start.call_count)
def test_no_duplication_when_booting_node_listed_fast(self):
# Test that we don't start two ComputeNodeMonitorActors when
# get the "node up" message from CloudNodeSetupActor.
cloud_node = testutil.cloud_node_mock(1)
self.make_daemon(want_sizes=[testutil.MockSize(1)])
- self.wait_for_call(self.node_setup.start)
+ self.daemon.max_nodes.get(self.TIMEOUT)
+ self.assertEqual(1, self.node_setup.start.call_count)
setup = mock.MagicMock(name='setup_node_mock')
setup.actor_ref = self.node_setup.start().proxy().actor_ref
setup.cloud_node.get.return_value = cloud_node
setup.arvados_node.get.return_value = testutil.arvados_node_mock(1)
- self.daemon.update_cloud_nodes([cloud_node])
- self.wait_for_call(self.node_factory.start)
- self.node_factory.reset_mock()
+ self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
+ self.assertTrue(self.node_factory.start.called)
self.daemon.node_up(setup).get(self.TIMEOUT)
- self.assertFalse(self.node_factory.start.called)
+ self.assertEqual(1, self.node_factory.start.call_count)
def test_booting_nodes_shut_down(self):
self.make_daemon(want_sizes=[testutil.MockSize(1)])
- self.wait_for_call(self.node_setup.start)
- self.daemon.update_server_wishlist([])
- self.wait_for_call(
- self.node_setup.start().proxy().stop_if_no_cloud_node)
+ self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ self.assertTrue(
+ self.node_setup.start().proxy().stop_if_no_cloud_node.called)
def test_shutdown_declined_at_wishlist_capacity(self):
cloud_node = testutil.cloud_node_mock(1)
self.make_daemon(cloud_nodes=[cloud_node], want_sizes=[size])
node_actor = self.node_factory().proxy()
self.daemon.node_can_shutdown(node_actor).get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
self.assertFalse(node_actor.shutdown.called)
def test_shutdown_accepted_below_capacity(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
node_actor = self.node_factory().proxy()
- self.daemon.node_can_shutdown(node_actor)
- self.wait_for_call(self.node_shutdown.start)
+ self.daemon.node_can_shutdown(node_actor).get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ self.assertTrue(self.node_shutdown.start.called)
def test_clean_shutdown_waits_for_node_setup_finish(self):
self.make_daemon(want_sizes=[testutil.MockSize(1)])
- self.wait_for_call(self.node_setup.start)
+ self.daemon.max_nodes.get(self.TIMEOUT)
+ self.assertTrue(self.node_setup.start.called)
new_node = self.node_setup.start().proxy()
- self.daemon.shutdown()
- self.wait_for_call(new_node.stop_if_no_cloud_node)
- self.daemon.node_up(new_node)
- self.wait_for_call(new_node.stop)
+ self.daemon.shutdown().get(self.TIMEOUT)
+ self.assertTrue(new_node.stop_if_no_cloud_node.called)
+ self.daemon.node_up(new_node).get(self.TIMEOUT)
+ self.assertTrue(new_node.stop.called)
self.assertTrue(
self.daemon.actor_ref.actor_stopped.wait(self.TIMEOUT))
def test_wishlist_ignored_after_shutdown(self):
size = testutil.MockSize(2)
self.make_daemon(want_sizes=[size])
- node_starter = self.node_setup.start
- self.wait_for_call(node_starter)
- node_starter.reset_mock()
- self.daemon.shutdown()
- self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
- # Send another message and wait for a response, to make sure all
- # internal messages generated by the wishlist update are processed.
+ self.daemon.shutdown().get(self.TIMEOUT)
self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
- self.assertFalse(node_starter.called)
+ self.stop_proxy(self.daemon)
+ self.assertEqual(1, self.node_setup.start.call_count)
def test_subscribers_get_server_lists(self):
self.build_monitor([{'items': [1, 2]}], self.MockCalculator())
- self.monitor.subscribe(self.subscriber)
- self.wait_for_call(self.subscriber)
+ self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.monitor)
self.subscriber.assert_called_with([testutil.MockSize(1),
testutil.MockSize(2)])
def test_uuid_is_subscription_key(self):
node = testutil.arvados_node_mock()
self.build_monitor([{'items': [node]}])
- self.monitor.subscribe_to(node['uuid'], self.subscriber)
- self.wait_for_call(self.subscriber)
+ self.monitor.subscribe_to(node['uuid'],
+ self.subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.monitor)
self.subscriber.assert_called_with(node)
def test_id_is_subscription_key(self):
node = self.MockNode(1)
self.build_monitor([[node]])
- self.monitor.subscribe_to('1', self.subscriber)
- self.wait_for_call(self.subscriber)
+ self.monitor.subscribe_to('1', self.subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.monitor)
self.subscriber.assert_called_with(node)
@testutil.no_sleep
class TimedCallBackActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
def test_immediate_turnaround(self):
- future = self.FUTURE_CLASS()
+ receiver = mock.Mock()
deliverer = timedcallback.TimedCallBackActor.start().proxy()
- deliverer.schedule(time.time() - 1, future.set, 'immediate')
- self.assertEqual('immediate', future.get(self.TIMEOUT))
+ deliverer.schedule(time.time() - 1, receiver,
+ 'immediate').get(self.TIMEOUT)
+ self.stop_proxy(deliverer)
+ receiver.assert_called_with('immediate')
def test_delayed_turnaround(self):
- future = self.FUTURE_CLASS()
+ receiver = mock.Mock()
with mock.patch('time.time', return_value=0) as mock_now:
deliverer = timedcallback.TimedCallBackActor.start().proxy()
- deliverer.schedule(1, future.set, 'delayed')
- self.assertRaises(pykka.Timeout, future.get, .5)
+ deliverer.schedule(1, receiver, 'delayed')
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ self.assertFalse(receiver.called)
mock_now.return_value = 2
- self.assertEqual('delayed', future.get(self.TIMEOUT))
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ self.stop_proxy(deliverer)
+ receiver.assert_called_with('delayed')
def test_out_of_order_scheduling(self):
- future1 = self.FUTURE_CLASS()
- future2 = self.FUTURE_CLASS()
+ receiver = mock.Mock()
with mock.patch('time.time', return_value=1.5) as mock_now:
deliverer = timedcallback.TimedCallBackActor.start().proxy()
- deliverer.schedule(2, future2.set, 'second')
- deliverer.schedule(1, future1.set, 'first')
- self.assertEqual('first', future1.get(self.TIMEOUT))
- self.assertRaises(pykka.Timeout, future2.get, .1)
- mock_now.return_value = 3
- self.assertEqual('second', future2.get(self.TIMEOUT))
+ deliverer.schedule(2, receiver, 'second')
+ deliverer.schedule(1, receiver, 'first')
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ receiver.assert_called_with('first')
+ mock_now.return_value = 2.5
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ self.stop_proxy(deliverer)
+ receiver.assert_called_with('second')
def test_dead_actors_ignored(self):
receiver = mock.Mock(name='dead_actor', spec=pykka.ActorRef)
receiver.tell.side_effect = pykka.ActorDeadError
deliverer = timedcallback.TimedCallBackActor.start().proxy()
- deliverer.schedule(time.time() - 1, receiver.tell, 'error')
- self.wait_for_call(receiver.tell)
+ deliverer.schedule(time.time() - 1, receiver.tell,
+ 'error').get(self.TIMEOUT)
+ self.assertTrue(self.stop_proxy(deliverer), "deliverer died")
receiver.tell.assert_called_with('error')
- self.assertTrue(deliverer.actor_ref.is_alive(), "deliverer died")
if __name__ == '__main__':
def tearDown(self):
pykka.ActorRegistry.stop_all()
- def wait_for_call(self, mock_func, timeout=TIMEOUT):
+ def stop_proxy(self, proxy):
+ return proxy.actor_ref.stop(timeout=self.TIMEOUT)
+
+ def wait_for_assignment(self, proxy, attr_name, unassigned=None,
+ timeout=TIMEOUT):
deadline = time.time() + timeout
- while (not mock_func.called) and (time.time() < deadline):
- time.sleep(.1)
- self.assertTrue(mock_func.called, "{} not called".format(mock_func))
+ while True:
+ loop_timeout = deadline - time.time()
+ if loop_timeout <= 0:
+ self.fail("actor did not assign {} in time".format(attr_name))
+ result = getattr(proxy, attr_name).get(loop_timeout)
+ if result is not unassigned:
+ return result
class RemotePollLoopActorTestMixin(ActorTestMixin):