Merge branch '13201-less-validate' closes #13201
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 5 Apr 2018 17:25:44 +0000 (13:25 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 5 Apr 2018 17:25:44 +0000 (13:25 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

services/nodemanager/arvnodeman/baseactor.py
services/nodemanager/arvnodeman/computenode/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/computenode/driver/__init__.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/status.py
services/nodemanager/tests/test_computenode_dispatch.py
services/nodemanager/tests/test_computenode_driver.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_failure.py
services/nodemanager/tests/test_status.py

index 565db6601f18e68f5f621e0838ee06e051038028..bdfe5d45a7ac444575192b9f1bb95fcadbf18bfe 100644 (file)
@@ -14,6 +14,8 @@ import traceback
 
 import pykka
 
+from .status import tracker
+
 class _TellCallableProxy(object):
     """Internal helper class for proxying callables."""
 
@@ -90,6 +92,7 @@ class BaseNodeManagerActor(pykka.ThreadingActor):
             exception_type is OSError and exception_value.errno == errno.ENOMEM):
             lg.critical("Unhandled exception is a fatal error, killing Node Manager")
             self._killfunc(os.getpid(), signal.SIGKILL)
+        tracker.counter_add('actor_exceptions')
 
     def ping(self):
         return True
index 4e46a438dbf1ea04fb066ccdc45b66f4b196ceae..3c04118abe2ec2bb3f5fee4c1a74e078d61119ed 100644 (file)
@@ -12,6 +12,7 @@ import re
 import time
 
 from ..config import CLOUD_ERRORS
+from ..status import tracker
 from libcloud.common.exceptions import BaseHTTPError, RateLimitReachedError
 
 ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
@@ -101,6 +102,7 @@ class RetryMixin(object):
                         if error.code == 429 or error.code >= 500:
                             should_retry = True
                     except CLOUD_ERRORS as error:
+                        tracker.counter_add('cloud_errors')
                         should_retry = True
                     except errors as error:
                         should_retry = True
@@ -108,9 +110,11 @@ class RetryMixin(object):
                         # As a libcloud workaround for drivers that don't use
                         # typed exceptions, consider bare Exception() objects
                         # retryable.
-                        should_retry = type(error) is Exception
+                        if type(error) is Exception:
+                            tracker.counter_add('cloud_errors')
+                            should_retry = True
                     else:
-                        # No exception,
+                        # No exception
                         self.retry_wait = self.min_retry_wait
                         return ret
 
index 37d7088b7a7c65bc8632e21269f465d6850d50b9..9106ea67ccc8ffac7813d64baa5ebc537548fa21 100644 (file)
@@ -20,6 +20,7 @@ from .. import \
     arvados_node_missing, RetryMixin
 from ...clientactor import _notify_subscribers
 from ... import config
+from ... import status
 from .transitions import transitions
 
 QuotaExceeded = "QuotaExceeded"
@@ -272,6 +273,9 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
                 self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
                                      try_resume=True)
                 return
+        # If boot failed, count the event
+        if self._monitor.get_state().get() == 'unpaired':
+            status.tracker.counter_add('boot_failures')
         self._destroy_node()
 
     def _destroy_node(self):
@@ -409,6 +413,12 @@ class ComputeNodeMonitorActor(config.actor_class):
         #if state == 'idle' and self.arvados_node['job_uuid']:
         #    state = 'busy'
 
+        # Update idle node times tracker
+        if state == 'idle':
+            status.tracker.idle_in(self.arvados_node['hostname'])
+        else:
+            status.tracker.idle_out(self.arvados_node['hostname'])
+
         return state
 
     def in_state(self, *states):
index 8f881b04e35341a738b19e1d233d0e851166c9db..7ed7435553647fdc55958337e6d2461345c4098d 100644 (file)
@@ -12,6 +12,7 @@ import libcloud.common.types as cloud_types
 from libcloud.compute.base import NodeDriver, NodeAuthSSHKey
 
 from ...config import CLOUD_ERRORS
+from ...status import tracker
 from .. import RetryMixin
 
 class BaseComputeNodeDriver(RetryMixin):
@@ -123,7 +124,11 @@ class BaseComputeNodeDriver(RetryMixin):
     def list_nodes(self, **kwargs):
         l = self.list_kwargs.copy()
         l.update(kwargs)
-        return self.real.list_nodes(**l)
+        try:
+            return self.real.list_nodes(**l)
+        except CLOUD_ERRORS:
+            tracker.counter_add('list_nodes_errors')
+            raise
 
     def create_cloud_name(self, arvados_node):
         """Return a cloud node name for the given Arvados node record.
@@ -181,6 +186,7 @@ class BaseComputeNodeDriver(RetryMixin):
             try:
                 return self.search_for_now(kwargs['name'], 'list_nodes', self._name_key)
             except ValueError:
+                tracker.counter_add('create_node_errors')
                 raise create_error
 
     def post_create_node(self, cloud_node):
@@ -211,7 +217,7 @@ class BaseComputeNodeDriver(RetryMixin):
     def destroy_node(self, cloud_node):
         try:
             return self.real.destroy_node(cloud_node)
-        except CLOUD_ERRORS as destroy_error:
+        except CLOUD_ERRORS:
             # Sometimes the destroy node request succeeds but times out and
             # raises an exception instead of returning success.  If this
             # happens, we get a noisy stack trace.  Check if the node is still
@@ -223,6 +229,7 @@ class BaseComputeNodeDriver(RetryMixin):
                 # it, which means destroy_node actually succeeded.
                 return True
             # The node is still on the list.  Re-raise.
+            tracker.counter_add('destroy_node_errors')
             raise
 
     # Now that we've defined all our own methods, delegate generic, public
index a6e73e2622c71c8cf200b09221877175feca7026..1b9f1e70ccc7cbf7b85b37f74e865d3e8a81964d 100644 (file)
@@ -215,8 +215,11 @@ class NodeManagerDaemonActor(actor_class):
             if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
                 self.cloud_nodes.add(record)
             else:
-                # Node disappeared from the cloud node list.  Stop the monitor
-                # actor if necessary and forget about the node.
+                # Node disappeared from the cloud node list. If it's paired,
+                # remove its idle time counter.
+                if record.arvados_node:
+                    status.tracker.idle_out(record.arvados_node.get('hostname'))
+                # Stop the monitor actor if necessary and forget about the node.
                 if record.actor:
                     try:
                         record.actor.stop()
@@ -270,6 +273,7 @@ class NodeManagerDaemonActor(actor_class):
             updates.setdefault('nodes_'+s, 0)
             updates['nodes_'+s] += 1
         updates['nodes_wish'] = len(self.last_wishlist)
+        updates['node_quota'] = self.node_quota
         status.tracker.update(updates)
 
     def _state_counts(self, size):
index 069bf168950ad4c33296876d1044b9eebea2a7a4..1e18996da6cacc24a6a0d7a06c6af91682912ef5 100644 (file)
@@ -6,6 +6,7 @@ from __future__ import absolute_import, print_function
 from future import standard_library
 
 import http.server
+import time
 import json
 import logging
 import socketserver
@@ -26,6 +27,7 @@ class Server(socketserver.ThreadingMixIn, http.server.HTTPServer, object):
             return
         self._config = config
         self._tracker = tracker
+        self._tracker.update({'config_max_nodes': config.getint('Daemon', 'max_nodes')})
         super(Server, self).__init__(
             (config.get('Manage', 'address'), port), Handler)
         self._thread = threading.Thread(target=self.serve_forever)
@@ -75,20 +77,53 @@ class Handler(http.server.BaseHTTPRequestHandler, object):
 class Tracker(object):
     def __init__(self):
         self._mtx = threading.Lock()
-        self._latest = {}
+        self._latest = {
+            'list_nodes_errors': 0,
+            'create_node_errors': 0,
+            'destroy_node_errors': 0,
+            'boot_failures': 0,
+            'actor_exceptions': 0
+        }
         self._version = {'Version' : __version__}
+        self._idle_nodes = {}
 
     def get_json(self):
         with self._mtx:
-            return json.dumps(dict(self._latest, **self._version))
+            times = {'idle_times' : {}}
+            now = time.time()
+            for node, ts in self._idle_nodes.items():
+                times['idle_times'][node] = int(now - ts)
+            return json.dumps(
+                dict(dict(self._latest, **self._version), **times))
 
     def keys(self):
         with self._mtx:
             return self._latest.keys()
 
+    def get(self, key):
+        with self._mtx:
+            return self._latest.get(key)
+
     def update(self, updates):
         with self._mtx:
             self._latest.update(updates)
 
+    def counter_add(self, counter, value=1):
+        with self._mtx:
+            self._latest.setdefault(counter, 0)
+            self._latest[counter] += value
+
+    def idle_in(self, nodename):
+        with self._mtx:
+            if self._idle_nodes.get(nodename):
+                return
+            self._idle_nodes[nodename] = time.time()
+
+    def idle_out(self, nodename):
+        with self._mtx:
+            try:
+                del self._idle_nodes[nodename]
+            except KeyError:
+                pass
 
 tracker = Tracker()
index 0a2deb8a9cdd70ca7a72f1ef41b067bbe2f00ea4..5775aa659a31391f13a5071929d9f5562ba3969d 100644 (file)
@@ -17,6 +17,7 @@ import threading
 from libcloud.common.exceptions import BaseHTTPError
 
 import arvnodeman.computenode.dispatch as dispatch
+import arvnodeman.status as status
 from arvnodeman.computenode.driver import BaseComputeNodeDriver
 from . import testutil
 
@@ -207,13 +208,23 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
     def check_success_flag(self, expected, allow_msg_count=1):
         # allow_msg_count is the number of internal messages that may
         # need to be handled for shutdown to finish.
-        for try_num in range(1 + allow_msg_count):
+        for _ in range(1 + allow_msg_count):
             last_flag = self.shutdown_actor.success.get(self.TIMEOUT)
             if last_flag is expected:
                 break
         else:
             self.fail("success flag {} is not {}".format(last_flag, expected))
 
+    def test_boot_failure_counting(self, *mocks):
+        # A boot failure happens when a node transitions from unpaired to shutdown
+        status.tracker.update({'boot_failures': 0})
+        self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="unpaired"))
+        self.cloud_client.destroy_node.return_value = True
+        self.make_actor(cancellable=False)
+        self.check_success_flag(True, 2)
+        self.assertTrue(self.cloud_client.destroy_node.called)
+        self.assertEqual(1, status.tracker.get('boot_failures'))
+
     def test_cancellable_shutdown(self, *mocks):
         self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
         self.cloud_client.destroy_node.return_value = True
@@ -222,11 +233,14 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
         self.assertFalse(self.cloud_client.destroy_node.called)
 
     def test_uncancellable_shutdown(self, *mocks):
+        status.tracker.update({'boot_failures': 0})
         self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
         self.cloud_client.destroy_node.return_value = True
         self.make_actor(cancellable=False)
         self.check_success_flag(True, 4)
         self.assertTrue(self.cloud_client.destroy_node.called)
+        # A normal shutdown shouldn't be counted as boot failure
+        self.assertEqual(0, status.tracker.get('boot_failures'))
 
     def test_arvados_node_cleaned_after_shutdown(self, *mocks):
         if len(mocks) == 1:
@@ -362,16 +376,26 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
         self.assertTrue(self.node_state('down'))
 
     def test_in_idle_state(self):
+        idle_nodes_before = status.tracker._idle_nodes.keys()
         self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None))
         self.assertTrue(self.node_state('idle'))
         self.assertFalse(self.node_state('busy'))
         self.assertTrue(self.node_state('idle', 'busy'))
+        idle_nodes_after = status.tracker._idle_nodes.keys()
+        new_idle_nodes = [n for n in idle_nodes_after if n not in idle_nodes_before]
+        # There should be 1 additional idle node
+        self.assertEqual(1, len(new_idle_nodes))
 
     def test_in_busy_state(self):
+        idle_nodes_before = status.tracker._idle_nodes.keys()
         self.make_actor(3, arv_node=testutil.arvados_node_mock(job_uuid=True))
         self.assertFalse(self.node_state('idle'))
         self.assertTrue(self.node_state('busy'))
         self.assertTrue(self.node_state('idle', 'busy'))
+        idle_nodes_after = status.tracker._idle_nodes.keys()
+        new_idle_nodes = [n for n in idle_nodes_after if n not in idle_nodes_before]
+        # There shouldn't be any additional idle node
+        self.assertEqual(0, len(new_idle_nodes))
 
     def test_init_shutdown_scheduling(self):
         self.make_actor()
index 11cd387c1210bc82ea372e61711ebc23c2c43985..128a29e28d24ba4d5f3f8aae1bc535c9c60af043 100644 (file)
@@ -11,6 +11,8 @@ import libcloud.common.types as cloud_types
 import mock
 
 import arvnodeman.computenode.driver as driver_base
+import arvnodeman.status as status
+import arvnodeman.config as config
 from . import testutil
 
 class ComputeNodeDriverTestCase(unittest.TestCase):
@@ -62,3 +64,50 @@ class ComputeNodeDriverTestCase(unittest.TestCase):
         self.assertIs(driver.search_for('id_1', 'list_images'),
                       driver.search_for('id_1', 'list_images'))
         self.assertEqual(1, self.driver_mock().list_images.call_count)
+
+
+    class TestBaseComputeNodeDriver(driver_base.BaseComputeNodeDriver):
+        def arvados_create_kwargs(self, size, arvados_node):
+            return {'name': arvados_node}
+
+
+    def test_create_node_only_cloud_errors_are_counted(self):
+        status.tracker.update({'create_node_errors': 0})
+        errors = [(config.CLOUD_ERRORS[0], True), (KeyError, False)]
+        self.driver_mock().list_images.return_value = []
+        driver = self.TestBaseComputeNodeDriver({}, {}, {}, self.driver_mock)
+        error_count = 0
+        for an_error, is_cloud_error in errors:
+            self.driver_mock().create_node.side_effect = an_error
+            with self.assertRaises(an_error):
+                driver.create_node('1', 'id_1')
+            if is_cloud_error:
+                error_count += 1
+            self.assertEqual(error_count, status.tracker.get('create_node_errors'))
+
+    def test_list_nodes_only_cloud_errors_are_counted(self):
+        status.tracker.update({'list_nodes_errors': 0})
+        errors = [(config.CLOUD_ERRORS[0], True), (KeyError, False)]
+        driver = self.TestBaseComputeNodeDriver({}, {}, {}, self.driver_mock)
+        error_count = 0
+        for an_error, is_cloud_error in errors:
+            self.driver_mock().list_nodes.side_effect = an_error
+            with self.assertRaises(an_error):
+                driver.list_nodes()
+            if is_cloud_error:
+                error_count += 1
+            self.assertEqual(error_count, status.tracker.get('list_nodes_errors'))
+
+    def test_destroy_node_only_cloud_errors_are_counted(self):
+        status.tracker.update({'destroy_node_errors': 0})
+        errors = [(config.CLOUD_ERRORS[0], True), (KeyError, False)]
+        self.driver_mock().list_nodes.return_value = [testutil.MockSize(1)]
+        driver = self.TestBaseComputeNodeDriver({}, {}, {}, self.driver_mock)
+        error_count = 0
+        for an_error, is_cloud_error in errors:
+            self.driver_mock().destroy_node.side_effect = an_error
+            with self.assertRaises(an_error):
+                driver.destroy_node(testutil.MockSize(1))
+            if is_cloud_error:
+                error_count += 1
+            self.assertEqual(error_count, status.tracker.get('destroy_node_errors'))
index 4b2b2d4bf8c1c6bac42f0a49e59047d2414eeb08..8050e6981411d69f127617e0cb2b44681470341d 100644 (file)
@@ -143,6 +143,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         size = testutil.MockSize(1)
         self.make_daemon(want_sizes=[size])
         self.busywait(lambda: self.node_setup.start.called)
+        self.assertIn('node_quota', status.tracker._latest)
 
     def check_monitors_arvados_nodes(self, *arv_nodes):
         self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
@@ -664,6 +665,27 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
         self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
 
+    def test_idle_node_disappearing_clears_status_idle_time_counter(self):
+        size = testutil.MockSize(1)
+        status.tracker._idle_nodes = {}
+        cloud_nodes = [testutil.cloud_node_mock(1, size=size)]
+        arv_nodes = [testutil.arvados_node_mock(1, job_uuid=None)]
+        self.make_daemon(cloud_nodes, arv_nodes, [size])
+        self.busywait(lambda: 1 == self.paired_monitor_count())
+        for mon_ref in self.monitor_list():
+            monitor = mon_ref.proxy()
+            if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
+                break
+        else:
+            self.fail("monitor for idle node not found")
+        self.assertEqual(1, status.tracker.get('nodes_idle'))
+        hostname = monitor.arvados_node.get()['hostname']
+        self.assertIn(hostname, status.tracker._idle_nodes)
+        # Simulate the node disappearing from the cloud node list
+        self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
+        self.busywait(lambda: 0 == self.alive_monitor_count())
+        self.assertNotIn(hostname, status.tracker._idle_nodes)
+
     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
         self.assertEqual(1, self.alive_monitor_count())
index ef4423dafaf7762b5d8a8c95fdbaacf630156917..2d1a17eaecd82a7cea0be0103d27fc2e8f07c4c5 100644 (file)
@@ -17,6 +17,7 @@ import pykka
 from . import testutil
 
 import arvnodeman.baseactor
+import arvnodeman.status as status
 
 class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
     def __init__(self, e, killfunc=None):
@@ -45,11 +46,13 @@ class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase):
             self.assertTrue(kill_mock.called)
 
     def test_nonfatal_error(self):
+        status.tracker.update({'actor_exceptions': 0})
         kill_mock = mock.Mock('os.kill')
         act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
         act.doStuff()
         act.actor_ref.stop(block=True)
         self.assertFalse(kill_mock.called)
+        self.assertEqual(1, status.tracker.get('actor_exceptions'))
 
 class WatchdogActorTest(testutil.ActorTestMixin, unittest.TestCase):
 
index 23658667a61843147854bd97304107e416c0d568..2a1c0fc58960c8a36a3decde1d0a611643c2c0a8 100644 (file)
@@ -7,6 +7,8 @@ from __future__ import absolute_import, print_function
 from future import standard_library
 
 import json
+import mock
+import random
 import requests
 import unittest
 
@@ -57,6 +59,44 @@ class StatusServerUpdates(unittest.TestCase):
                 self.assertEqual(n, resp['nodes_'+str(n)])
             self.assertEqual(1, resp['nodes_1'])
             self.assertIn('Version', resp)
+            self.assertIn('config_max_nodes', resp)
+
+    def test_counters(self):
+        with TestServer() as srv:
+            resp = srv.get_status()
+            # Test counters existance
+            for counter in ['list_nodes_errors', 'create_node_errors',
+                'destroy_node_errors', 'boot_failures', 'actor_exceptions']:
+                self.assertIn(counter, resp)
+            # Test counter increment
+            for count in range(1, 3):
+                status.tracker.counter_add('a_counter')
+                resp = srv.get_status()
+                self.assertEqual(count, resp['a_counter'])
+
+    @mock.patch('time.time')
+    def test_idle_times(self, time_mock):
+        with TestServer() as srv:
+            resp = srv.get_status()
+            node_name = 'idle_compute{}'.format(random.randint(1, 1024))
+            self.assertIn('idle_times', resp)
+            # Test add an idle node
+            time_mock.return_value = 10
+            status.tracker.idle_in(node_name)
+            time_mock.return_value += 10
+            resp = srv.get_status()
+            self.assertEqual(10, resp['idle_times'][node_name])
+            # Test adding the same idle node a 2nd time
+            time_mock.return_value += 10
+            status.tracker.idle_in(node_name)
+            time_mock.return_value += 10
+            resp = srv.get_status()
+            # Idle timestamp doesn't get reset if already exists
+            self.assertEqual(30, resp['idle_times'][node_name])
+            # Test remove idle node
+            status.tracker.idle_out(node_name)
+            resp = srv.get_status()
+            self.assertNotIn(node_name, resp['idle_times'])
 
 
 class StatusServerDisabled(unittest.TestCase):