12085: Test actor & cloud errors counting.
[arvados.git] / services / nodemanager / arvnodeman / status.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import absolute_import, print_function
6 from future import standard_library
7
8 import http.server
9 import json
10 import logging
11 import socketserver
12 import threading
13
14 from ._version import __version__
15
16 _logger = logging.getLogger('status.Handler')
17
18
19 class Server(socketserver.ThreadingMixIn, http.server.HTTPServer, object):
20     def __init__(self, config):
21         port = config.getint('Manage', 'port')
22         self.enabled = port >= 0
23         if not self.enabled:
24             _logger.warning("Management server disabled. "+
25                             "Use [Manage] config section to enable.")
26             return
27         self._config = config
28         self._tracker = tracker
29         self._tracker.update({'config_max_nodes': config.getint('Daemon', 'max_nodes')})
30         super(Server, self).__init__(
31             (config.get('Manage', 'address'), port), Handler)
32         self._thread = threading.Thread(target=self.serve_forever)
33         self._thread.daemon = True
34
35     def start(self):
36         if self.enabled:
37             self._thread.start()
38
39
40 class Handler(http.server.BaseHTTPRequestHandler, object):
41     def do_GET(self):
42         if self.path == '/status.json':
43             self.send_response(200)
44             self.send_header('Content-type', 'application/json')
45             self.end_headers()
46             self.wfile.write(tracker.get_json())
47         elif self.path == '/_health/ping':
48             code, msg = self.check_auth()
49
50             if code != 200:
51               self.send_response(code)
52               self.wfile.write(msg)
53             else:
54               self.send_response(200)
55               self.send_header('Content-type', 'application/json')
56               self.end_headers()
57               self.wfile.write(json.dumps({"health":"OK"}))
58         else:
59             self.send_response(404)
60
61     def log_message(self, fmt, *args, **kwargs):
62         _logger.info(fmt, *args, **kwargs)
63
64     def check_auth(self):
65         mgmt_token = self.server._config.get('Manage', 'ManagementToken')
66         auth_header = self.headers.get('Authorization', None)
67
68         if mgmt_token == '':
69           return 404, "disabled"
70         elif auth_header == None:
71           return 401, "authorization required"
72         elif auth_header != 'Bearer '+mgmt_token:
73           return 403, "authorization error"
74         return 200, ""
75
76 class Tracker(object):
77     def __init__(self):
78         self._mtx = threading.Lock()
79         self._latest = {
80             'cloud_errors': 0,
81             'boot_failures': 0,
82             'actor_exceptions': 0
83         }
84         self._version = {'Version' : __version__}
85
86     def get_json(self):
87         with self._mtx:
88             return json.dumps(dict(self._latest, **self._version))
89
90     def keys(self):
91         with self._mtx:
92             return self._latest.keys()
93
94     def get(self, key):
95         with self._mtx:
96             return self._latest.get(key)
97
98     def update(self, updates):
99         with self._mtx:
100             self._latest.update(updates)
101
102     def counter_add(self, counter, value=1):
103         with self._mtx:
104             self._latest.setdefault(counter, 0)
105             self._latest[counter] += value
106
107 tracker = Tracker()