from future import standard_library
import json
+import mock
+import random
import requests
import unittest
def test_counters(self):
with TestServer() as srv:
resp = srv.get_status()
- # Test initial values
- for counter in ['cloud_errors', 'boot_failures', 'actor_exceptions']:
+ # Test counters existance
+ for counter in ['list_nodes_errors', 'create_node_errors',
+ 'destroy_node_errors', 'boot_failures', 'actor_exceptions']:
self.assertIn(counter, resp)
- self.assertEqual(0, resp[counter])
# Test counter increment
for count in range(1, 3):
status.tracker.counter_add('a_counter')
resp = srv.get_status()
self.assertEqual(count, resp['a_counter'])
+ @mock.patch('time.time')
+ def test_idle_times(self, time_mock):
+ with TestServer() as srv:
+ resp = srv.get_status()
+ node_name = 'idle_compute{}'.format(random.randint(1, 1024))
+ self.assertIn('idle_times', resp)
+ # Test add an idle node
+ time_mock.return_value = 10
+ status.tracker.idle_in(node_name)
+ time_mock.return_value += 10
+ resp = srv.get_status()
+ self.assertEqual(10, resp['idle_times'][node_name])
+ # Test adding the same idle node a 2nd time
+ time_mock.return_value += 10
+ status.tracker.idle_in(node_name)
+ time_mock.return_value += 10
+ resp = srv.get_status()
+ # Idle timestamp doesn't get reset if already exists
+ self.assertEqual(30, resp['idle_times'][node_name])
+ # Test remove idle node
+ status.tracker.idle_out(node_name)
+ resp = srv.get_status()
+ self.assertNotIn(node_name, resp['idle_times'])
+
class StatusServerDisabled(unittest.TestCase):
def test_config_disabled(self):