2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
17 from . import testutil
19 import arvnodeman.baseactor
21 class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
22 def __init__(self, e, killfunc=None):
23 super(BogusActor, self).__init__(killfunc=killfunc)
30 # Called by WatchdogActorTest, this delay is longer than the test timeout
31 # of 1 second, which should cause the watchdog ping to fail.
35 class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase):
36 def test_fatal_error(self):
37 for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
38 kill_mock = mock.Mock('os.kill')
39 bgact = BogusActor.start(e, killfunc=kill_mock)
40 act_thread = bgact.proxy().get_thread().get()
41 act = bgact.tell_proxy()
43 act.actor_ref.stop(block=True)
45 self.assertTrue(kill_mock.called)
47 def test_nonfatal_error(self):
48 kill_mock = mock.Mock('os.kill')
49 act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
51 act.actor_ref.stop(block=True)
52 self.assertFalse(kill_mock.called)
54 class WatchdogActorTest(testutil.ActorTestMixin, unittest.TestCase):
56 def test_time_timout(self):
57 kill_mock = mock.Mock('os.kill')
58 act = BogusActor.start(OSError(errno.ENOENT, ""))
59 watch = arvnodeman.baseactor.WatchdogActor.start(1, act, killfunc=kill_mock)
61 watch.stop(block=True)
63 self.assertTrue(kill_mock.called)