15964: Remove qr1hi from a few more places. Delete unused includes.
[arvados.git] / services / nodemanager / arvnodeman / baseactor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import absolute_import, print_function
6
7 import errno
8 import logging
9 import os
10 import signal
11 import time
12 import threading
13 import traceback
14
15 import pykka
16
17 from .status import tracker
18
19 class _TellCallableProxy(object):
20     """Internal helper class for proxying callables."""
21
22     def __init__(self, ref, attr_path):
23         self.actor_ref = ref
24         self._attr_path = attr_path
25
26     def __call__(self, *args, **kwargs):
27         message = {
28             'command': 'pykka_call',
29             'attr_path': self._attr_path,
30             'args': args,
31             'kwargs': kwargs,
32         }
33         self.actor_ref.tell(message)
34
35
36 class TellActorProxy(pykka.ActorProxy):
37     """ActorProxy in which all calls are implemented as using tell().
38
39     The standard pykka.ActorProxy always uses ask() and returns a Future.  If
40     the target method raises an exception, it is placed in the Future object
41     and re-raised when get() is called on the Future.  Unfortunately, most
42     messaging in Node Manager is asynchronous and the caller does not store the
43     Future object returned by the call to ActorProxy.  As a result, exceptions
44     resulting from these calls end up in limbo, neither reported in the logs
45     nor handled by on_failure().
46
47     The TellActorProxy uses tell() instead of ask() and does not return a
48     Future object.  As a result, if the target method raises an exception, it
49     will be logged and on_failure() will be called as intended.
50
51     """
52
53     def __repr__(self):
54         return '<ActorProxy for %s, attr_path=%s>' % (
55             self.actor_ref, self._attr_path)
56
57     def __getattr__(self, name):
58         """Get a callable from the actor."""
59         attr_path = self._attr_path + (name,)
60         if attr_path not in self._known_attrs:
61             self._known_attrs = self._get_attributes()
62         attr_info = self._known_attrs.get(attr_path)
63         if attr_info is None:
64             raise AttributeError('%s has no attribute "%s"' % (self, name))
65         if attr_info['callable']:
66             if attr_path not in self._callable_proxies:
67                 self._callable_proxies[attr_path] = _TellCallableProxy(
68                     self.actor_ref, attr_path)
69             return self._callable_proxies[attr_path]
70         else:
71             raise AttributeError('attribute "%s" is not a callable on %s' % (name, self))
72
73 class TellableActorRef(pykka.ActorRef):
74     """ActorRef adding the tell_proxy() method to get TellActorProxy."""
75
76     def tell_proxy(self):
77         return TellActorProxy(self)
78
79 class BaseNodeManagerActor(pykka.ThreadingActor):
80     """Base class for actors in node manager, redefining actor_ref as a
81     TellableActorRef and providing a default on_failure handler.
82     """
83
84     def __init__(self, *args, **kwargs):
85          super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
86          self.actor_ref = TellableActorRef(self)
87          self._killfunc = kwargs.get("killfunc", os.kill)
88
89     def on_failure(self, exception_type, exception_value, tb):
90         lg = getattr(self, "_logger", logging)
91         if (exception_type in (threading.ThreadError, MemoryError) or
92             exception_type is OSError and exception_value.errno == errno.ENOMEM):
93             lg.critical("Unhandled exception is a fatal error, killing Node Manager")
94             self._killfunc(os.getpid(), signal.SIGKILL)
95         tracker.counter_add('actor_exceptions')
96
97     def ping(self):
98         return True
99
100     def get_thread(self):
101         return threading.current_thread()
102
103 class WatchdogActor(pykka.ThreadingActor):
104     def __init__(self, timeout, *args, **kwargs):
105          super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
106          self.timeout = timeout
107          self.actors = [a.proxy() for a in args]
108          self.actor_ref = TellableActorRef(self)
109          self._later = self.actor_ref.tell_proxy()
110          self._killfunc = kwargs.get("killfunc", os.kill)
111
112     def kill_self(self, e, act):
113         lg = getattr(self, "_logger", logging)
114         lg.critical("Watchdog exception", exc_info=e)
115         lg.critical("Actor %s watchdog ping time out, killing Node Manager", act)
116         self._killfunc(os.getpid(), signal.SIGKILL)
117
118     def on_start(self):
119         self._later.run()
120
121     def run(self):
122         a = None
123         try:
124             for a in self.actors:
125                 a.ping().get(self.timeout)
126             time.sleep(20)
127             self._later.run()
128         except Exception as e:
129             self.kill_self(e, a)