self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(tracker.get_json())
+ elif self.path == '/_health/ping':
+ code, msg = self.check_auth()
+
+ if code != 200:
+ self.send_response(code)
+ self.wfile.write(msg)
+ else:
+ self.send_response(200)
+ self.send_header('Content-type', 'application/json')
+ self.end_headers()
+ self.wfile.write(json.dumps({"health":"OK"}))
else:
self.send_response(404)
def log_message(self, fmt, *args, **kwargs):
_logger.info(fmt, *args, **kwargs)
+ def check_auth(self):
+ mgmt_token = self.server._config.get('Manage', 'management_token')
+ auth_header = self.headers.get('Authorization', None)
+
+ if mgmt_token == '':
+ return 404, "disabled"
+ elif auth_header == None:
+ return 401, "authorization required"
+ elif auth_header != 'Bearer '+mgmt_token:
+ return 403, "authorization error"
+ return 200, ""
class Tracker(object):
def __init__(self):
from __future__ import absolute_import, print_function
from future import standard_library
+import json
import requests
import unittest
class TestServer(object):
+ def __init__(self, management_token=None):
+ self.mgmt_token = management_token
+
def __enter__(self):
cfg = config.NodeManagerConfig()
cfg.set('Manage', 'port', '0')
cfg.set('Manage', 'address', '127.0.0.1')
+ if self.mgmt_token != None:
+ cfg.set('Manage', 'management_token', self.mgmt_token)
self.srv = status.Server(cfg)
self.srv.start()
addr, port = self.srv.server_address
def get_status(self):
return self.get_status_response().json()
+ def get_healthcheck_ping(self, auth_header=None):
+ headers = {}
+ if auth_header != None:
+ headers['Authorization'] = auth_header
+ return requests.get(self.srv_base+'/_health/ping', headers=headers)
class StatusServerUpdates(unittest.TestCase):
def test_updates(self):
self.srv.start()
self.assertFalse(self.srv.enabled)
self.assertFalse(getattr(self.srv, '_thread', False))
+
+class HealthcheckPing(unittest.TestCase):
+ def test_ping_disabled(self):
+ with TestServer() as srv:
+ r = srv.get_healthcheck_ping()
+ self.assertEqual(404, r.status_code)
+
+ def test_ping_no_auth(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping()
+ self.assertEqual(401, r.status_code)
+
+ def test_ping_bad_auth_format(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping('noBearer')
+ self.assertEqual(403, r.status_code)
+
+ def test_ping_bad_auth_token(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping('Bearer badtoken')
+ self.assertEqual(403, r.status_code)
+
+ def test_ping_success(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping('Bearer configuredmanagementtoken')
+ self.assertEqual(200, r.status_code)
+ self.assertEqual('application/json', r.headers['content-type'])
+ resp = r.json()
+ self.assertEqual('{"health": "OK"}', json.dumps(resp))