1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import absolute_import, print_function
17 from .status import tracker
19 class _TellCallableProxy(object):
20 """Internal helper class for proxying callables."""
22 def __init__(self, ref, attr_path):
24 self._attr_path = attr_path
26 def __call__(self, *args, **kwargs):
28 'command': 'pykka_call',
29 'attr_path': self._attr_path,
33 self.actor_ref.tell(message)
36 class TellActorProxy(pykka.ActorProxy):
37 """ActorProxy in which all calls are implemented as using tell().
39 The standard pykka.ActorProxy always uses ask() and returns a Future. If
40 the target method raises an exception, it is placed in the Future object
41 and re-raised when get() is called on the Future. Unfortunately, most
42 messaging in Node Manager is asynchronous and the caller does not store the
43 Future object returned by the call to ActorProxy. As a result, exceptions
44 resulting from these calls end up in limbo, neither reported in the logs
45 nor handled by on_failure().
47 The TellActorProxy uses tell() instead of ask() and does not return a
48 Future object. As a result, if the target method raises an exception, it
49 will be logged and on_failure() will be called as intended.
54 return '<ActorProxy for %s, attr_path=%s>' % (
55 self.actor_ref, self._attr_path)
57 def __getattr__(self, name):
58 """Get a callable from the actor."""
59 attr_path = self._attr_path + (name,)
60 if attr_path not in self._known_attrs:
61 self._known_attrs = self._get_attributes()
62 attr_info = self._known_attrs.get(attr_path)
64 raise AttributeError('%s has no attribute "%s"' % (self, name))
65 if attr_info['callable']:
66 if attr_path not in self._callable_proxies:
67 self._callable_proxies[attr_path] = _TellCallableProxy(
68 self.actor_ref, attr_path)
69 return self._callable_proxies[attr_path]
71 raise AttributeError('attribute "%s" is not a callable on %s' % (name, self))
73 class TellableActorRef(pykka.ActorRef):
74 """ActorRef adding the tell_proxy() method to get TellActorProxy."""
77 return TellActorProxy(self)
79 class BaseNodeManagerActor(pykka.ThreadingActor):
80 """Base class for actors in node manager, redefining actor_ref as a
81 TellableActorRef and providing a default on_failure handler.
84 def __init__(self, *args, **kwargs):
85 super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
86 self.actor_ref = TellableActorRef(self)
87 self._killfunc = kwargs.get("killfunc", os.kill)
89 def on_failure(self, exception_type, exception_value, tb):
90 lg = getattr(self, "_logger", logging)
91 if (exception_type in (threading.ThreadError, MemoryError) or
92 exception_type is OSError and exception_value.errno == errno.ENOMEM):
93 lg.critical("Unhandled exception is a fatal error, killing Node Manager")
94 self._killfunc(os.getpid(), signal.SIGKILL)
95 tracker.counter_add('actor_exceptions')
100 def get_thread(self):
101 return threading.current_thread()
103 class WatchdogActor(pykka.ThreadingActor):
104 def __init__(self, timeout, *args, **kwargs):
105 super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
106 self.timeout = timeout
107 self.actors = [a.proxy() for a in args]
108 self.actor_ref = TellableActorRef(self)
109 self._later = self.actor_ref.tell_proxy()
110 self._killfunc = kwargs.get("killfunc", os.kill)
112 def kill_self(self, e, act):
113 lg = getattr(self, "_logger", logging)
114 lg.critical("Watchdog exception", exc_info=e)
115 lg.critical("Actor %s watchdog ping time out, killing Node Manager", act)
116 self._killfunc(os.getpid(), signal.SIGKILL)
124 for a in self.actors:
125 a.ping().get(self.timeout)
128 except Exception as e: