18794: Fail health check on mismatched configs.
[arvados.git] / sdk / go / health / aggregator.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package health
6
7 import (
8         "bufio"
9         "context"
10         "crypto/tls"
11         "encoding/json"
12         "errors"
13         "flag"
14         "fmt"
15         "io"
16         "net/http"
17         "net/url"
18         "regexp"
19         "strconv"
20         "strings"
21         "sync"
22         "time"
23
24         "git.arvados.org/arvados.git/lib/cmd"
25         "git.arvados.org/arvados.git/lib/config"
26         "git.arvados.org/arvados.git/sdk/go/arvados"
27         "git.arvados.org/arvados.git/sdk/go/auth"
28         "git.arvados.org/arvados.git/sdk/go/ctxlog"
29         "github.com/ghodss/yaml"
30         "github.com/sirupsen/logrus"
31 )
32
33 const defaultTimeout = arvados.Duration(2 * time.Second)
34
35 // Aggregator implements service.Handler. It handles "GET /_health/all"
36 // by checking the health of all configured services on the cluster
37 // and responding 200 if everything is healthy.
38 type Aggregator struct {
39         setupOnce  sync.Once
40         httpClient *http.Client
41         timeout    arvados.Duration
42
43         Cluster *arvados.Cluster
44
45         // If non-nil, Log is called after handling each request.
46         Log func(*http.Request, error)
47 }
48
49 func (agg *Aggregator) setup() {
50         agg.httpClient = &http.Client{
51                 Transport: &http.Transport{
52                         TLSClientConfig: &tls.Config{
53                                 InsecureSkipVerify: agg.Cluster.TLS.Insecure,
54                         },
55                 },
56         }
57         if agg.timeout == 0 {
58                 // this is always the case, except in the test suite
59                 agg.timeout = defaultTimeout
60         }
61 }
62
63 func (agg *Aggregator) CheckHealth() error {
64         return nil
65 }
66
67 func (agg *Aggregator) Done() <-chan struct{} {
68         return nil
69 }
70
71 func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
72         agg.setupOnce.Do(agg.setup)
73         sendErr := func(statusCode int, err error) {
74                 resp.WriteHeader(statusCode)
75                 json.NewEncoder(resp).Encode(map[string]string{"error": err.Error()})
76                 if agg.Log != nil {
77                         agg.Log(req, err)
78                 }
79         }
80
81         resp.Header().Set("Content-Type", "application/json")
82
83         if !agg.checkAuth(req) {
84                 sendErr(http.StatusUnauthorized, errUnauthorized)
85                 return
86         }
87         if req.URL.Path == "/_health/all" {
88                 json.NewEncoder(resp).Encode(agg.ClusterHealth())
89         } else if req.URL.Path == "/_health/ping" {
90                 resp.Write(healthyBody)
91         } else {
92                 sendErr(http.StatusNotFound, errNotFound)
93                 return
94         }
95         if agg.Log != nil {
96                 agg.Log(req, nil)
97         }
98 }
99
100 type ClusterHealthResponse struct {
101         // "OK" if all needed services are OK, otherwise "ERROR".
102         Health string `json:"health"`
103
104         // An entry for each known health check of each known instance
105         // of each needed component: "instance of service S on node N
106         // reports health-check C is OK."
107         Checks map[string]CheckResult `json:"checks"`
108
109         // An entry for each service type: "service S is OK." This
110         // exposes problems that can't be expressed in Checks, like
111         // "service S is needed, but isn't configured to run
112         // anywhere."
113         Services map[arvados.ServiceName]ServiceHealth `json:"services"`
114
115         Errors []string `json:"errors"`
116 }
117
118 type CheckResult struct {
119         Health         string                 `json:"health"`
120         Error          string                 `json:"error,omitempty"`
121         HTTPStatusCode int                    `json:",omitempty"`
122         HTTPStatusText string                 `json:",omitempty"`
123         Response       map[string]interface{} `json:"response"`
124         ResponseTime   json.Number            `json:"responseTime"`
125         Metrics        Metrics                `json:"-"`
126 }
127
128 type Metrics struct {
129         ConfigSourceTimestamp time.Time
130         ConfigSourceSHA256    string
131 }
132
133 type ServiceHealth struct {
134         Health string `json:"health"`
135         N      int    `json:"n"`
136 }
137
138 func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
139         agg.setupOnce.Do(agg.setup)
140         resp := ClusterHealthResponse{
141                 Health:   "OK",
142                 Checks:   make(map[string]CheckResult),
143                 Services: make(map[arvados.ServiceName]ServiceHealth),
144         }
145
146         mtx := sync.Mutex{}
147         wg := sync.WaitGroup{}
148         for svcName, svc := range agg.Cluster.Services.Map() {
149                 // Ensure svc is listed in resp.Services.
150                 mtx.Lock()
151                 if _, ok := resp.Services[svcName]; !ok {
152                         resp.Services[svcName] = ServiceHealth{Health: "ERROR"}
153                 }
154                 mtx.Unlock()
155
156                 checkURLs := map[arvados.URL]bool{}
157                 for addr := range svc.InternalURLs {
158                         checkURLs[addr] = true
159                 }
160                 if len(checkURLs) == 0 && svc.ExternalURL.Host != "" {
161                         checkURLs[svc.ExternalURL] = true
162                 }
163                 for addr := range checkURLs {
164                         wg.Add(1)
165                         go func(svcName arvados.ServiceName, addr arvados.URL) {
166                                 defer wg.Done()
167                                 var result CheckResult
168                                 pingURL, err := agg.pingURL(addr)
169                                 if err != nil {
170                                         result = CheckResult{
171                                                 Health: "ERROR",
172                                                 Error:  err.Error(),
173                                         }
174                                 } else {
175                                         result = agg.ping(pingURL)
176                                         m, err := agg.metrics(pingURL)
177                                         if err != nil {
178                                                 result.Error = "metrics: " + err.Error()
179                                         }
180                                         result.Metrics = m
181                                 }
182
183                                 mtx.Lock()
184                                 defer mtx.Unlock()
185                                 resp.Checks[fmt.Sprintf("%s+%s", svcName, pingURL)] = result
186                                 if result.Health == "OK" {
187                                         h := resp.Services[svcName]
188                                         h.N++
189                                         h.Health = "OK"
190                                         resp.Services[svcName] = h
191                                 } else {
192                                         resp.Health = "ERROR"
193                                 }
194                         }(svcName, addr)
195                 }
196         }
197         wg.Wait()
198
199         // Report ERROR if a needed service didn't fail any checks
200         // merely because it isn't configured to run anywhere.
201         for _, sh := range resp.Services {
202                 if sh.Health != "OK" {
203                         resp.Health = "ERROR"
204                         break
205                 }
206         }
207
208         var newest Metrics
209         for _, result := range resp.Checks {
210                 if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
211                         newest = result.Metrics
212                 }
213         }
214         var mismatches []string
215         for target, result := range resp.Checks {
216                 if hash := result.Metrics.ConfigSourceSHA256; hash != "" && hash != newest.ConfigSourceSHA256 {
217                         mismatches = append(mismatches, target)
218                 }
219         }
220         for _, target := range mismatches {
221                 msg := fmt.Sprintf("outdated config: %s: config file (sha256 %s) does not match latest version with timestamp %s",
222                         strings.TrimSuffix(target, "/_health/ping"),
223                         resp.Checks[target].Metrics.ConfigSourceSHA256,
224                         newest.ConfigSourceTimestamp.Format(time.RFC3339))
225                 resp.Errors = append(resp.Errors, msg)
226                 resp.Health = "ERROR"
227         }
228         return resp
229 }
230
231 func (agg *Aggregator) pingURL(svcURL arvados.URL) (*url.URL, error) {
232         base := url.URL(svcURL)
233         return base.Parse("/_health/ping")
234 }
235
236 func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
237         t0 := time.Now()
238         defer func() {
239                 result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
240         }()
241         result.Health = "ERROR"
242
243         ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
244         defer cancel()
245         req, err := http.NewRequestWithContext(ctx, "GET", target.String(), nil)
246         if err != nil {
247                 result.Error = err.Error()
248                 return
249         }
250         req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
251
252         // Avoid workbench1's redirect-http-to-https feature
253         req.Header.Set("X-Forwarded-Proto", "https")
254
255         resp, err := agg.httpClient.Do(req)
256         if err != nil {
257                 result.Error = err.Error()
258                 return
259         }
260         result.HTTPStatusCode = resp.StatusCode
261         result.HTTPStatusText = resp.Status
262         err = json.NewDecoder(resp.Body).Decode(&result.Response)
263         if err != nil {
264                 result.Error = fmt.Sprintf("cannot decode response: %s", err)
265         } else if resp.StatusCode != http.StatusOK {
266                 result.Error = fmt.Sprintf("HTTP %d %s", resp.StatusCode, resp.Status)
267         } else if h, _ := result.Response["health"].(string); h != "OK" {
268                 if e, ok := result.Response["error"].(string); ok && e != "" {
269                         result.Error = e
270                         return
271                 } else {
272                         result.Error = fmt.Sprintf("health=%q in ping response", h)
273                         return
274                 }
275         }
276         result.Health = "OK"
277         return
278 }
279
280 var reMetric = regexp.MustCompile(`([a-z_]+){sha256="([0-9a-f]+)"} (\d[\d\.e\+]+)`)
281
282 func (agg *Aggregator) metrics(pingURL *url.URL) (result Metrics, err error) {
283         metricsURL, err := pingURL.Parse("/metrics")
284         if err != nil {
285                 return
286         }
287         ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
288         defer cancel()
289         req, err := http.NewRequestWithContext(ctx, "GET", metricsURL.String(), nil)
290         if err != nil {
291                 return
292         }
293         req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
294
295         // Avoid workbench1's redirect-http-to-https feature
296         req.Header.Set("X-Forwarded-Proto", "https")
297
298         resp, err := agg.httpClient.Do(req)
299         if err != nil {
300                 return
301         } else if resp.StatusCode != http.StatusOK {
302                 err = fmt.Errorf("%s: HTTP %d %s", metricsURL.String(), resp.StatusCode, resp.Status)
303                 return
304         }
305
306         scanner := bufio.NewScanner(resp.Body)
307         for scanner.Scan() {
308                 m := reMetric.FindSubmatch(scanner.Bytes())
309                 if len(m) != 4 || string(m[1]) != "arvados_config_source_timestamp_seconds" {
310                         continue
311                 }
312                 result.ConfigSourceSHA256 = string(m[2])
313                 unixtime, _ := strconv.ParseFloat(string(m[3]), 64)
314                 result.ConfigSourceTimestamp = time.UnixMicro(int64(unixtime * 1e6))
315         }
316         if err = scanner.Err(); err != nil {
317                 err = fmt.Errorf("error parsing response from %s: %w", metricsURL.String(), err)
318                 return
319         }
320         return
321 }
322
323 func (agg *Aggregator) checkAuth(req *http.Request) bool {
324         creds := auth.CredentialsFromRequest(req)
325         for _, token := range creds.Tokens {
326                 if token != "" && token == agg.Cluster.ManagementToken {
327                         return true
328                 }
329         }
330         return false
331 }
332
333 var errSilent = errors.New("")
334
335 var CheckCommand cmd.Handler = checkCommand{}
336
337 type checkCommand struct{}
338
339 func (ccmd checkCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
340         logger := ctxlog.New(stderr, "json", "info")
341         ctx := ctxlog.Context(context.Background(), logger)
342         err := ccmd.run(ctx, prog, args, stdin, stdout, stderr)
343         if err != nil {
344                 if err != errSilent {
345                         fmt.Fprintln(stdout, err.Error())
346                 }
347                 return 1
348         }
349         return 0
350 }
351
352 func (ccmd checkCommand) run(ctx context.Context, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) error {
353         flags := flag.NewFlagSet("", flag.ContinueOnError)
354         flags.SetOutput(stderr)
355         loader := config.NewLoader(stdin, ctxlog.New(stderr, "text", "info"))
356         loader.SetupFlags(flags)
357         versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
358         timeout := flags.Duration("timeout", defaultTimeout.Duration(), "Maximum time to wait for health responses")
359         if ok, _ := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
360                 // cmd.ParseFlags already reported the error
361                 return errSilent
362         } else if *versionFlag {
363                 cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
364                 return nil
365         }
366         cfg, err := loader.Load()
367         if err != nil {
368                 return err
369         }
370         cluster, err := cfg.GetCluster("")
371         if err != nil {
372                 return err
373         }
374         logger := ctxlog.New(stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel).WithFields(logrus.Fields{
375                 "ClusterID": cluster.ClusterID,
376         })
377         ctx = ctxlog.Context(ctx, logger)
378         agg := Aggregator{Cluster: cluster, timeout: arvados.Duration(*timeout)}
379         resp := agg.ClusterHealth()
380         buf, err := yaml.Marshal(resp)
381         if err != nil {
382                 return err
383         }
384         stdout.Write(buf)
385         if resp.Health != "OK" {
386                 return fmt.Errorf("health check failed")
387         }
388         return nil
389 }