import pykka
+from .status import tracker
+
class _TellCallableProxy(object):
"""Internal helper class for proxying callables."""
exception_type is OSError and exception_value.errno == errno.ENOMEM):
lg.critical("Unhandled exception is a fatal error, killing Node Manager")
self._killfunc(os.getpid(), signal.SIGKILL)
+ tracker.counter_add('actor_exceptions')
def ping(self):
return True
import time
from ..config import CLOUD_ERRORS
+from ..status import tracker
from libcloud.common.exceptions import BaseHTTPError, RateLimitReachedError
ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
if error.code == 429 or error.code >= 500:
should_retry = True
except CLOUD_ERRORS as error:
+ tracker.counter_add('cloud_errors')
should_retry = True
except errors as error:
should_retry = True
# As a libcloud workaround for drivers that don't use
# typed exceptions, consider bare Exception() objects
# retryable.
- should_retry = type(error) is Exception
+ if type(error) is Exception:
+ tracker.counter_add('cloud_errors')
+ should_retry = True
else:
- # No exception,
+ # No exception
self.retry_wait = self.min_retry_wait
return ret
arvados_node_missing, RetryMixin
from ...clientactor import _notify_subscribers
from ... import config
+from ... import status
from .transitions import transitions
QuotaExceeded = "QuotaExceeded"
self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
try_resume=True)
return
+ # If boot failed, count the event
+ if self._monitor.get_state().get() == 'unpaired':
+ status.tracker.counter_add('boot_failures')
self._destroy_node()
def _destroy_node(self):
#if state == 'idle' and self.arvados_node['job_uuid']:
# state = 'busy'
+ # Update idle node times tracker
+ if state == 'idle':
+ status.tracker.idle_in(self.arvados_node['hostname'])
+ else:
+ status.tracker.idle_out(self.arvados_node['hostname'])
+
return state
def in_state(self, *states):
from libcloud.compute.base import NodeDriver, NodeAuthSSHKey
from ...config import CLOUD_ERRORS
+from ...status import tracker
from .. import RetryMixin
class BaseComputeNodeDriver(RetryMixin):
def list_nodes(self, **kwargs):
l = self.list_kwargs.copy()
l.update(kwargs)
- return self.real.list_nodes(**l)
+ try:
+ return self.real.list_nodes(**l)
+ except CLOUD_ERRORS:
+ tracker.counter_add('list_nodes_errors')
+ raise
def create_cloud_name(self, arvados_node):
"""Return a cloud node name for the given Arvados node record.
try:
return self.search_for_now(kwargs['name'], 'list_nodes', self._name_key)
except ValueError:
+ tracker.counter_add('create_node_errors')
raise create_error
def post_create_node(self, cloud_node):
def destroy_node(self, cloud_node):
try:
return self.real.destroy_node(cloud_node)
- except CLOUD_ERRORS as destroy_error:
+ except CLOUD_ERRORS:
# Sometimes the destroy node request succeeds but times out and
# raises an exception instead of returning success. If this
# happens, we get a noisy stack trace. Check if the node is still
# it, which means destroy_node actually succeeded.
return True
# The node is still on the list. Re-raise.
+ tracker.counter_add('destroy_node_errors')
raise
# Now that we've defined all our own methods, delegate generic, public
if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
self.cloud_nodes.add(record)
else:
- # Node disappeared from the cloud node list. Stop the monitor
- # actor if necessary and forget about the node.
+ # Node disappeared from the cloud node list. If it's paired,
+ # remove its idle time counter.
+ if record.arvados_node:
+ status.tracker.idle_out(record.arvados_node.get('hostname'))
+ # Stop the monitor actor if necessary and forget about the node.
if record.actor:
try:
record.actor.stop()
updates.setdefault('nodes_'+s, 0)
updates['nodes_'+s] += 1
updates['nodes_wish'] = len(self.last_wishlist)
+ updates['node_quota'] = self.node_quota
status.tracker.update(updates)
def _state_counts(self, size):
from future import standard_library
import http.server
+import time
import json
import logging
import socketserver
return
self._config = config
self._tracker = tracker
+ self._tracker.update({'config_max_nodes': config.getint('Daemon', 'max_nodes')})
super(Server, self).__init__(
(config.get('Manage', 'address'), port), Handler)
self._thread = threading.Thread(target=self.serve_forever)
class Tracker(object):
def __init__(self):
self._mtx = threading.Lock()
- self._latest = {}
+ self._latest = {
+ 'list_nodes_errors': 0,
+ 'create_node_errors': 0,
+ 'destroy_node_errors': 0,
+ 'boot_failures': 0,
+ 'actor_exceptions': 0
+ }
self._version = {'Version' : __version__}
+ self._idle_nodes = {}
def get_json(self):
with self._mtx:
- return json.dumps(dict(self._latest, **self._version))
+ times = {'idle_times' : {}}
+ now = time.time()
+ for node, ts in self._idle_nodes.items():
+ times['idle_times'][node] = int(now - ts)
+ return json.dumps(
+ dict(dict(self._latest, **self._version), **times))
def keys(self):
with self._mtx:
return self._latest.keys()
+ def get(self, key):
+ with self._mtx:
+ return self._latest.get(key)
+
def update(self, updates):
with self._mtx:
self._latest.update(updates)
+ def counter_add(self, counter, value=1):
+ with self._mtx:
+ self._latest.setdefault(counter, 0)
+ self._latest[counter] += value
+
+ def idle_in(self, nodename):
+ with self._mtx:
+ if self._idle_nodes.get(nodename):
+ return
+ self._idle_nodes[nodename] = time.time()
+
+ def idle_out(self, nodename):
+ with self._mtx:
+ try:
+ del self._idle_nodes[nodename]
+ except KeyError:
+ pass
tracker = Tracker()
from libcloud.common.exceptions import BaseHTTPError
import arvnodeman.computenode.dispatch as dispatch
+import arvnodeman.status as status
from arvnodeman.computenode.driver import BaseComputeNodeDriver
from . import testutil
def check_success_flag(self, expected, allow_msg_count=1):
# allow_msg_count is the number of internal messages that may
# need to be handled for shutdown to finish.
- for try_num in range(1 + allow_msg_count):
+ for _ in range(1 + allow_msg_count):
last_flag = self.shutdown_actor.success.get(self.TIMEOUT)
if last_flag is expected:
break
else:
self.fail("success flag {} is not {}".format(last_flag, expected))
+ def test_boot_failure_counting(self, *mocks):
+ # A boot failure happens when a node transitions from unpaired to shutdown
+ status.tracker.update({'boot_failures': 0})
+ self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="unpaired"))
+ self.cloud_client.destroy_node.return_value = True
+ self.make_actor(cancellable=False)
+ self.check_success_flag(True, 2)
+ self.assertTrue(self.cloud_client.destroy_node.called)
+ self.assertEqual(1, status.tracker.get('boot_failures'))
+
def test_cancellable_shutdown(self, *mocks):
self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
self.cloud_client.destroy_node.return_value = True
self.assertFalse(self.cloud_client.destroy_node.called)
def test_uncancellable_shutdown(self, *mocks):
+ status.tracker.update({'boot_failures': 0})
self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
self.cloud_client.destroy_node.return_value = True
self.make_actor(cancellable=False)
self.check_success_flag(True, 4)
self.assertTrue(self.cloud_client.destroy_node.called)
+ # A normal shutdown shouldn't be counted as boot failure
+ self.assertEqual(0, status.tracker.get('boot_failures'))
def test_arvados_node_cleaned_after_shutdown(self, *mocks):
if len(mocks) == 1:
self.assertTrue(self.node_state('down'))
def test_in_idle_state(self):
+ idle_nodes_before = status.tracker._idle_nodes.keys()
self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None))
self.assertTrue(self.node_state('idle'))
self.assertFalse(self.node_state('busy'))
self.assertTrue(self.node_state('idle', 'busy'))
+ idle_nodes_after = status.tracker._idle_nodes.keys()
+ new_idle_nodes = [n for n in idle_nodes_after if n not in idle_nodes_before]
+ # There should be 1 additional idle node
+ self.assertEqual(1, len(new_idle_nodes))
def test_in_busy_state(self):
+ idle_nodes_before = status.tracker._idle_nodes.keys()
self.make_actor(3, arv_node=testutil.arvados_node_mock(job_uuid=True))
self.assertFalse(self.node_state('idle'))
self.assertTrue(self.node_state('busy'))
self.assertTrue(self.node_state('idle', 'busy'))
+ idle_nodes_after = status.tracker._idle_nodes.keys()
+ new_idle_nodes = [n for n in idle_nodes_after if n not in idle_nodes_before]
+ # There shouldn't be any additional idle node
+ self.assertEqual(0, len(new_idle_nodes))
def test_init_shutdown_scheduling(self):
self.make_actor()
import mock
import arvnodeman.computenode.driver as driver_base
+import arvnodeman.status as status
+import arvnodeman.config as config
from . import testutil
class ComputeNodeDriverTestCase(unittest.TestCase):
self.assertIs(driver.search_for('id_1', 'list_images'),
driver.search_for('id_1', 'list_images'))
self.assertEqual(1, self.driver_mock().list_images.call_count)
+
+
+ class TestBaseComputeNodeDriver(driver_base.BaseComputeNodeDriver):
+ def arvados_create_kwargs(self, size, arvados_node):
+ return {'name': arvados_node}
+
+
+ def test_create_node_only_cloud_errors_are_counted(self):
+ status.tracker.update({'create_node_errors': 0})
+ errors = [(config.CLOUD_ERRORS[0], True), (KeyError, False)]
+ self.driver_mock().list_images.return_value = []
+ driver = self.TestBaseComputeNodeDriver({}, {}, {}, self.driver_mock)
+ error_count = 0
+ for an_error, is_cloud_error in errors:
+ self.driver_mock().create_node.side_effect = an_error
+ with self.assertRaises(an_error):
+ driver.create_node('1', 'id_1')
+ if is_cloud_error:
+ error_count += 1
+ self.assertEqual(error_count, status.tracker.get('create_node_errors'))
+
+ def test_list_nodes_only_cloud_errors_are_counted(self):
+ status.tracker.update({'list_nodes_errors': 0})
+ errors = [(config.CLOUD_ERRORS[0], True), (KeyError, False)]
+ driver = self.TestBaseComputeNodeDriver({}, {}, {}, self.driver_mock)
+ error_count = 0
+ for an_error, is_cloud_error in errors:
+ self.driver_mock().list_nodes.side_effect = an_error
+ with self.assertRaises(an_error):
+ driver.list_nodes()
+ if is_cloud_error:
+ error_count += 1
+ self.assertEqual(error_count, status.tracker.get('list_nodes_errors'))
+
+ def test_destroy_node_only_cloud_errors_are_counted(self):
+ status.tracker.update({'destroy_node_errors': 0})
+ errors = [(config.CLOUD_ERRORS[0], True), (KeyError, False)]
+ self.driver_mock().list_nodes.return_value = [testutil.MockSize(1)]
+ driver = self.TestBaseComputeNodeDriver({}, {}, {}, self.driver_mock)
+ error_count = 0
+ for an_error, is_cloud_error in errors:
+ self.driver_mock().destroy_node.side_effect = an_error
+ with self.assertRaises(an_error):
+ driver.destroy_node(testutil.MockSize(1))
+ if is_cloud_error:
+ error_count += 1
+ self.assertEqual(error_count, status.tracker.get('destroy_node_errors'))
size = testutil.MockSize(1)
self.make_daemon(want_sizes=[size])
self.busywait(lambda: self.node_setup.start.called)
+ self.assertIn('node_quota', status.tracker._latest)
def check_monitors_arvados_nodes(self, *arv_nodes):
self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
+ def test_idle_node_disappearing_clears_status_idle_time_counter(self):
+ size = testutil.MockSize(1)
+ status.tracker._idle_nodes = {}
+ cloud_nodes = [testutil.cloud_node_mock(1, size=size)]
+ arv_nodes = [testutil.arvados_node_mock(1, job_uuid=None)]
+ self.make_daemon(cloud_nodes, arv_nodes, [size])
+ self.busywait(lambda: 1 == self.paired_monitor_count())
+ for mon_ref in self.monitor_list():
+ monitor = mon_ref.proxy()
+ if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
+ break
+ else:
+ self.fail("monitor for idle node not found")
+ self.assertEqual(1, status.tracker.get('nodes_idle'))
+ hostname = monitor.arvados_node.get()['hostname']
+ self.assertIn(hostname, status.tracker._idle_nodes)
+ # Simulate the node disappearing from the cloud node list
+ self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
+ self.busywait(lambda: 0 == self.alive_monitor_count())
+ self.assertNotIn(hostname, status.tracker._idle_nodes)
+
def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
self.assertEqual(1, self.alive_monitor_count())
from . import testutil
import arvnodeman.baseactor
+import arvnodeman.status as status
class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
def __init__(self, e, killfunc=None):
self.assertTrue(kill_mock.called)
def test_nonfatal_error(self):
+ status.tracker.update({'actor_exceptions': 0})
kill_mock = mock.Mock('os.kill')
act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
act.doStuff()
act.actor_ref.stop(block=True)
self.assertFalse(kill_mock.called)
+ self.assertEqual(1, status.tracker.get('actor_exceptions'))
class WatchdogActorTest(testutil.ActorTestMixin, unittest.TestCase):
from future import standard_library
import json
+import mock
+import random
import requests
import unittest
self.assertEqual(n, resp['nodes_'+str(n)])
self.assertEqual(1, resp['nodes_1'])
self.assertIn('Version', resp)
+ self.assertIn('config_max_nodes', resp)
+
+ def test_counters(self):
+ with TestServer() as srv:
+ resp = srv.get_status()
+ # Test counters existance
+ for counter in ['list_nodes_errors', 'create_node_errors',
+ 'destroy_node_errors', 'boot_failures', 'actor_exceptions']:
+ self.assertIn(counter, resp)
+ # Test counter increment
+ for count in range(1, 3):
+ status.tracker.counter_add('a_counter')
+ resp = srv.get_status()
+ self.assertEqual(count, resp['a_counter'])
+
+ @mock.patch('time.time')
+ def test_idle_times(self, time_mock):
+ with TestServer() as srv:
+ resp = srv.get_status()
+ node_name = 'idle_compute{}'.format(random.randint(1, 1024))
+ self.assertIn('idle_times', resp)
+ # Test add an idle node
+ time_mock.return_value = 10
+ status.tracker.idle_in(node_name)
+ time_mock.return_value += 10
+ resp = srv.get_status()
+ self.assertEqual(10, resp['idle_times'][node_name])
+ # Test adding the same idle node a 2nd time
+ time_mock.return_value += 10
+ status.tracker.idle_in(node_name)
+ time_mock.return_value += 10
+ resp = srv.get_status()
+ # Idle timestamp doesn't get reset if already exists
+ self.assertEqual(30, resp['idle_times'][node_name])
+ # Test remove idle node
+ status.tracker.idle_out(node_name)
+ resp = srv.get_status()
+ self.assertNotIn(node_name, resp['idle_times'])
class StatusServerDisabled(unittest.TestCase):