12199: Update integration tests.
[arvados.git] / sdk / go / health / aggregator.go
1 package health
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "fmt"
8         "net"
9         "net/http"
10         "sync"
11         "time"
12
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14         "git.curoverse.com/arvados.git/sdk/go/auth"
15 )
16
17 const defaultTimeout = arvados.Duration(2 * time.Second)
18
19 // Aggregator implements http.Handler. It handles "GET /_health/all"
20 // by checking the health of all configured services on the cluster
21 // and responding 200 if everything is healthy.
22 type Aggregator struct {
23         setupOnce  sync.Once
24         httpClient *http.Client
25         timeout    arvados.Duration
26
27         Config *arvados.Config
28
29         // If non-nil, Log is called after handling each request.
30         Log func(*http.Request, error)
31 }
32
33 func (agg *Aggregator) setup() {
34         agg.httpClient = http.DefaultClient
35         if agg.timeout == 0 {
36                 // this is always the case, except in the test suite
37                 agg.timeout = defaultTimeout
38         }
39 }
40
41 func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
42         agg.setupOnce.Do(agg.setup)
43         sendErr := func(statusCode int, err error) {
44                 resp.WriteHeader(statusCode)
45                 json.NewEncoder(resp).Encode(map[string]string{"error": err.Error()})
46                 if agg.Log != nil {
47                         agg.Log(req, err)
48                 }
49         }
50
51         resp.Header().Set("Content-Type", "application/json")
52
53         cluster, err := agg.Config.GetCluster("")
54         if err != nil {
55                 err = fmt.Errorf("arvados.GetCluster(): %s", err)
56                 sendErr(http.StatusInternalServerError, err)
57                 return
58         }
59         if !agg.checkAuth(req, cluster) {
60                 sendErr(http.StatusUnauthorized, errUnauthorized)
61                 return
62         }
63         if req.URL.Path != "/_health/all" {
64                 sendErr(http.StatusNotFound, errNotFound)
65                 return
66         }
67         json.NewEncoder(resp).Encode(agg.ClusterHealth(cluster))
68         if agg.Log != nil {
69                 agg.Log(req, nil)
70         }
71 }
72
73 type ClusterHealthResponse struct {
74         // "OK" if all needed services are OK, otherwise "ERROR".
75         Health string `json:"health"`
76
77         // An entry for each known health check of each known instance
78         // of each needed component: "instance of service S on node N
79         // reports health-check C is OK."
80         Checks map[string]CheckResult `json:"checks"`
81
82         // An entry for each service type: "service S is OK." This
83         // exposes problems that can't be expressed in Checks, like
84         // "service S is needed, but isn't configured to run
85         // anywhere."
86         Services map[string]ServiceHealth `json:"services"`
87 }
88
89 type CheckResult struct {
90         Health         string                 `json:"health"`
91         Error          string                 `json:"error,omitempty"`
92         HTTPStatusCode int                    `json:",omitempty"`
93         HTTPStatusText string                 `json:",omitempty"`
94         Response       map[string]interface{} `json:"response"`
95         ResponseTime   json.Number            `json:"responseTime"`
96 }
97
98 type ServiceHealth struct {
99         Health string `json:"health"`
100         N      int    `json:"n"`
101 }
102
103 func (agg *Aggregator) ClusterHealth(cluster *arvados.Cluster) ClusterHealthResponse {
104         resp := ClusterHealthResponse{
105                 Health:   "OK",
106                 Checks:   make(map[string]CheckResult),
107                 Services: make(map[string]ServiceHealth),
108         }
109
110         mtx := sync.Mutex{}
111         wg := sync.WaitGroup{}
112         for node, nodeConfig := range cluster.SystemNodes {
113                 for svc, addr := range nodeConfig.ServicePorts() {
114                         // Ensure svc is listed in resp.Services.
115                         mtx.Lock()
116                         if _, ok := resp.Services[svc]; !ok {
117                                 resp.Services[svc] = ServiceHealth{Health: "ERROR"}
118                         }
119                         mtx.Unlock()
120
121                         if addr == "" {
122                                 // svc is not expected on this node.
123                                 continue
124                         }
125
126                         wg.Add(1)
127                         go func(node, svc, addr string) {
128                                 defer wg.Done()
129                                 var result CheckResult
130                                 url, err := agg.pingURL(node, addr)
131                                 if err != nil {
132                                         result = CheckResult{
133                                                 Health: "ERROR",
134                                                 Error:  err.Error(),
135                                         }
136                                 } else {
137                                         result = agg.ping(url, cluster)
138                                 }
139
140                                 mtx.Lock()
141                                 defer mtx.Unlock()
142                                 resp.Checks[svc+"+"+url] = result
143                                 if result.Health == "OK" {
144                                         h := resp.Services[svc]
145                                         h.N++
146                                         h.Health = "OK"
147                                         resp.Services[svc] = h
148                                 } else {
149                                         resp.Health = "ERROR"
150                                 }
151                         }(node, svc, addr)
152                 }
153         }
154         wg.Wait()
155
156         // Report ERROR if a needed service didn't fail any checks
157         // merely because it isn't configured to run anywhere.
158         for _, sh := range resp.Services {
159                 if sh.Health != "OK" {
160                         resp.Health = "ERROR"
161                         break
162                 }
163         }
164         return resp
165 }
166
167 func (agg *Aggregator) pingURL(node, addr string) (string, error) {
168         _, port, err := net.SplitHostPort(addr)
169         return "http://" + node + ":" + port + "/_health/ping", err
170 }
171
172 func (agg *Aggregator) ping(url string, cluster *arvados.Cluster) (result CheckResult) {
173         t0 := time.Now()
174
175         var err error
176         defer func() {
177                 result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
178                 if err != nil {
179                         result.Health, result.Error = "ERROR", err.Error()
180                 } else {
181                         result.Health = "OK"
182                 }
183         }()
184
185         req, err := http.NewRequest("GET", url, nil)
186         if err != nil {
187                 return
188         }
189         req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
190
191         ctx, cancel := context.WithTimeout(req.Context(), time.Duration(agg.timeout))
192         defer cancel()
193         req = req.WithContext(ctx)
194         resp, err := agg.httpClient.Do(req)
195         if err != nil {
196                 return
197         }
198         result.HTTPStatusCode = resp.StatusCode
199         result.HTTPStatusText = resp.Status
200         err = json.NewDecoder(resp.Body).Decode(&result.Response)
201         if err != nil {
202                 err = fmt.Errorf("cannot decode response: %s", err)
203         } else if resp.StatusCode != http.StatusOK {
204                 err = fmt.Errorf("HTTP %d %s", resp.StatusCode, resp.Status)
205         } else if h, _ := result.Response["health"].(string); h != "OK" {
206                 if e, ok := result.Response["error"].(string); ok && e != "" {
207                         err = errors.New(e)
208                 } else {
209                         err = fmt.Errorf("health=%q in ping response", h)
210                 }
211         }
212         return
213 }
214
215 func (agg *Aggregator) checkAuth(req *http.Request, cluster *arvados.Cluster) bool {
216         creds := auth.NewCredentialsFromHTTPRequest(req)
217         for _, token := range creds.Tokens {
218                 if token != "" && token == cluster.ManagementToken {
219                         return true
220                 }
221         }
222         return false
223 }