Merge branch '13383-trash-workers'
[arvados.git] / services / nodemanager / arvnodeman / status.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import absolute_import, print_function
6 from future import standard_library
7
8 import http.server
9 import time
10 import json
11 import logging
12 import socketserver
13 import threading
14
15 from ._version import __version__
16
17 _logger = logging.getLogger('status.Handler')
18
19
20 class Server(socketserver.ThreadingMixIn, http.server.HTTPServer, object):
21     def __init__(self, config):
22         port = config.getint('Manage', 'port')
23         self.enabled = port >= 0
24         if not self.enabled:
25             _logger.warning("Management server disabled. "+
26                             "Use [Manage] config section to enable.")
27             return
28         self._config = config
29         self._tracker = tracker
30         self._tracker.update({'config_max_nodes': config.getint('Daemon', 'max_nodes')})
31         super(Server, self).__init__(
32             (config.get('Manage', 'address'), port), Handler)
33         self._thread = threading.Thread(target=self.serve_forever)
34         self._thread.daemon = True
35
36     def start(self):
37         if self.enabled:
38             self._thread.start()
39
40
41 class Handler(http.server.BaseHTTPRequestHandler, object):
42     def do_GET(self):
43         if self.path == '/status.json':
44             self.send_response(200)
45             self.send_header('Content-type', 'application/json')
46             self.end_headers()
47             self.wfile.write(tracker.get_json())
48         elif self.path == '/_health/ping':
49             code, msg = self.check_auth()
50
51             if code != 200:
52               self.send_response(code)
53               self.wfile.write(msg)
54             else:
55               self.send_response(200)
56               self.send_header('Content-type', 'application/json')
57               self.end_headers()
58               self.wfile.write(json.dumps({"health":"OK"}))
59         else:
60             self.send_response(404)
61
62     def log_message(self, fmt, *args, **kwargs):
63         _logger.info(fmt, *args, **kwargs)
64
65     def check_auth(self):
66         mgmt_token = self.server._config.get('Manage', 'ManagementToken')
67         auth_header = self.headers.get('Authorization', None)
68
69         if mgmt_token == '':
70           return 404, "disabled"
71         elif auth_header == None:
72           return 401, "authorization required"
73         elif auth_header != 'Bearer '+mgmt_token:
74           return 403, "authorization error"
75         return 200, ""
76
77 class Tracker(object):
78     def __init__(self):
79         self._mtx = threading.Lock()
80         self._latest = {
81             'list_nodes_errors': 0,
82             'create_node_errors': 0,
83             'destroy_node_errors': 0,
84             'boot_failures': 0,
85             'actor_exceptions': 0
86         }
87         self._version = {'Version' : __version__}
88         self._idle_nodes = {}
89
90     def get_json(self):
91         with self._mtx:
92             times = {'idle_times' : {}}
93             now = time.time()
94             for node, ts in self._idle_nodes.items():
95                 times['idle_times'][node] = int(now - ts)
96             return json.dumps(
97                 dict(dict(self._latest, **self._version), **times))
98
99     def keys(self):
100         with self._mtx:
101             return self._latest.keys()
102
103     def get(self, key):
104         with self._mtx:
105             return self._latest.get(key)
106
107     def update(self, updates):
108         with self._mtx:
109             self._latest.update(updates)
110
111     def counter_add(self, counter, value=1):
112         with self._mtx:
113             self._latest.setdefault(counter, 0)
114             self._latest[counter] += value
115
116     def idle_in(self, nodename):
117         with self._mtx:
118             if self._idle_nodes.get(nodename):
119                 return
120             self._idle_nodes[nodename] = time.time()
121
122     def idle_out(self, nodename):
123         with self._mtx:
124             try:
125                 del self._idle_nodes[nodename]
126             except KeyError:
127                 pass
128
129 tracker = Tracker()