import pykka
+from .status import tracker
+
class _TellCallableProxy(object):
"""Internal helper class for proxying callables."""
exception_type is OSError and exception_value.errno == errno.ENOMEM):
lg.critical("Unhandled exception is a fatal error, killing Node Manager")
self._killfunc(os.getpid(), signal.SIGKILL)
+ tracker.counter_add('actor_exceptions')
def ping(self):
return True
import time
from ..config import CLOUD_ERRORS
+from ..status import tracker
from libcloud.common.exceptions import BaseHTTPError, RateLimitReachedError
ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
if error.code == 429 or error.code >= 500:
should_retry = True
except CLOUD_ERRORS as error:
+ tracker.counter_add('cloud_errors')
should_retry = True
except errors as error:
should_retry = True
# As a libcloud workaround for drivers that don't use
# typed exceptions, consider bare Exception() objects
# retryable.
- should_retry = type(error) is Exception
+ if type(error) is Exception:
+ tracker.counter_add('cloud_errors')
+ should_retry = True
else:
- # No exception,
+ # No exception
self.retry_wait = self.min_retry_wait
return ret
from libcloud.compute.base import NodeDriver, NodeAuthSSHKey
from ...config import CLOUD_ERRORS
+from ...status import tracker
from .. import RetryMixin
class BaseComputeNodeDriver(RetryMixin):
def list_nodes(self, **kwargs):
l = self.list_kwargs.copy()
l.update(kwargs)
- return self.real.list_nodes(**l)
+ try:
+ return self.real.list_nodes(**l)
+ except CLOUD_ERRORS:
+ tracker.counter_add('cloud_errors')
+ raise
def create_cloud_name(self, arvados_node):
"""Return a cloud node name for the given Arvados node record.
try:
return self.search_for_now(kwargs['name'], 'list_nodes', self._name_key)
except ValueError:
+ tracker.counter_add('cloud_errors')
raise create_error
def post_create_node(self, cloud_node):
def destroy_node(self, cloud_node):
try:
return self.real.destroy_node(cloud_node)
- except CLOUD_ERRORS as destroy_error:
+ except CLOUD_ERRORS:
# Sometimes the destroy node request succeeds but times out and
# raises an exception instead of returning success. If this
# happens, we get a noisy stack trace. Check if the node is still
# it, which means destroy_node actually succeeded.
return True
# The node is still on the list. Re-raise.
+ tracker.counter_add('cloud_errors')
raise
# Now that we've defined all our own methods, delegate generic, public
return
self._config = config
self._tracker = tracker
+ self._tracker.update({'config_max_nodes': config.getint('Daemon', 'max_nodes')})
super(Server, self).__init__(
(config.get('Manage', 'address'), port), Handler)
self._thread = threading.Thread(target=self.serve_forever)
class Tracker(object):
def __init__(self):
self._mtx = threading.Lock()
- self._latest = {}
+ self._latest = {
+ 'cloud_errors': 0,
+ 'boot_failures': 0,
+ 'actor_exceptions': 0
+ }
self._version = {'Version' : __version__}
def get_json(self):
with self._mtx:
self._latest.update(updates)
+ def counter_add(self, counter, value=1):
+ with self._mtx:
+ self._latest.setdefault(counter, 0)
+ self._latest[counter] += value
tracker = Tracker()
self.assertEqual(n, resp['nodes_'+str(n)])
self.assertEqual(1, resp['nodes_1'])
self.assertIn('Version', resp)
+ self.assertIn('config_max_nodes', resp)
+
+ def test_counters(self):
+ with TestServer() as srv:
+ resp = srv.get_status()
+ # Test initial values
+ for counter in ['cloud_errors', 'boot_failures', 'actor_exceptions']:
+ self.assertIn(counter, resp)
+ self.assertEqual(0, resp[counter])
+ # Test counter increment
+ for count in range(1, 3):
+ status.tracker.counter_add('a_counter')
+ resp = srv.get_status()
+ self.assertEqual(count, resp['a_counter'])
class StatusServerDisabled(unittest.TestCase):