closes #7399
[arvados.git] / services / nodemanager / tests / test_failure.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import errno
6 import logging
7 import threading
8 import unittest
9
10 import mock
11 import pykka
12
13 from . import testutil
14
15 import arvnodeman.fullstopactor
16
17 class BogusActor(arvnodeman.fullstopactor.FullStopActor):
18     def __init__(self, e):
19         super(BogusActor, self).__init__()
20         self.exp = e
21
22     def doStuff(self):
23         raise self.exp
24
25 class ActorUnhandledExceptionTest(unittest.TestCase):
26     def test1(self):
27         for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
28             with mock.patch('os.killpg') as killpg_mock:
29                 act = BogusActor.start(e)
30                 act.tell({
31                     'command': 'pykka_call',
32                     'attr_path': ("doStuff",),
33                     'args': [],
34                     'kwargs': {}
35                 })
36                 act.stop(block=True)
37                 self.assertTrue(killpg_mock.called)
38
39         with mock.patch('os.killpg') as killpg_mock:
40             act = BogusActor.start(OSError(errno.ENOENT, ""))
41             act.tell({
42                 'command': 'pykka_call',
43                 'attr_path': ("doStuff",),
44                 'args': [],
45                 'kwargs': {}
46             })
47             act.stop(block=True)
48             self.assertFalse(killpg_mock.called)