18693: Use row locking only if enabled in config.
[arvados.git] / sdk / go / health / aggregator.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package health
6
7 import (
8         "bufio"
9         "bytes"
10         "context"
11         "crypto/tls"
12         "encoding/json"
13         "errors"
14         "flag"
15         "fmt"
16         "io"
17         "net"
18         "net/http"
19         "net/url"
20         "regexp"
21         "strconv"
22         "strings"
23         "sync"
24         "time"
25
26         "git.arvados.org/arvados.git/lib/cmd"
27         "git.arvados.org/arvados.git/lib/config"
28         "git.arvados.org/arvados.git/sdk/go/arvados"
29         "git.arvados.org/arvados.git/sdk/go/auth"
30         "git.arvados.org/arvados.git/sdk/go/ctxlog"
31         "github.com/ghodss/yaml"
32         "github.com/prometheus/client_golang/prometheus"
33         "github.com/sirupsen/logrus"
34 )
35
36 const (
37         defaultTimeout = arvados.Duration(2 * time.Second)
38         maxClockSkew   = time.Minute
39 )
40
41 // Aggregator implements service.Handler. It handles "GET /_health/all"
42 // by checking the health of all configured services on the cluster
43 // and responding 200 if everything is healthy.
44 type Aggregator struct {
45         setupOnce  sync.Once
46         httpClient *http.Client
47         timeout    arvados.Duration
48
49         Cluster *arvados.Cluster
50
51         // If non-nil, Log is called after handling each request.
52         Log func(*http.Request, error)
53
54         // If non-nil, report clock skew on each health-check.
55         MetricClockSkew prometheus.Gauge
56 }
57
58 func (agg *Aggregator) setup() {
59         agg.httpClient = &http.Client{
60                 Transport: &http.Transport{
61                         TLSClientConfig: &tls.Config{
62                                 InsecureSkipVerify: agg.Cluster.TLS.Insecure,
63                         },
64                 },
65         }
66         if agg.timeout == 0 {
67                 // this is always the case, except in the test suite
68                 agg.timeout = defaultTimeout
69         }
70 }
71
72 func (agg *Aggregator) CheckHealth() error {
73         return nil
74 }
75
76 func (agg *Aggregator) Done() <-chan struct{} {
77         return nil
78 }
79
80 func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
81         agg.setupOnce.Do(agg.setup)
82         sendErr := func(statusCode int, err error) {
83                 resp.WriteHeader(statusCode)
84                 json.NewEncoder(resp).Encode(map[string]string{"error": err.Error()})
85                 if agg.Log != nil {
86                         agg.Log(req, err)
87                 }
88         }
89
90         resp.Header().Set("Content-Type", "application/json")
91
92         if !agg.checkAuth(req) {
93                 sendErr(http.StatusUnauthorized, errUnauthorized)
94                 return
95         }
96         if req.URL.Path == "/_health/all" {
97                 json.NewEncoder(resp).Encode(agg.ClusterHealth())
98         } else if req.URL.Path == "/_health/ping" {
99                 resp.Write(healthyBody)
100         } else {
101                 sendErr(http.StatusNotFound, errNotFound)
102                 return
103         }
104         if agg.Log != nil {
105                 agg.Log(req, nil)
106         }
107 }
108
109 type ClusterHealthResponse struct {
110         // "OK" if all needed services are OK, otherwise "ERROR".
111         Health string
112
113         // An entry for each known health check of each known instance
114         // of each needed component: "instance of service S on node N
115         // reports health-check C is OK."
116         Checks map[string]CheckResult
117
118         // An entry for each service type: "service S is OK." This
119         // exposes problems that can't be expressed in Checks, like
120         // "service S is needed, but isn't configured to run
121         // anywhere."
122         Services map[arvados.ServiceName]ServiceHealth
123
124         // Difference between min/max timestamps in individual
125         // health-check responses.
126         ClockSkew arvados.Duration
127
128         Errors []string
129 }
130
131 type CheckResult struct {
132         Health         string
133         Error          string                 `json:",omitempty"`
134         HTTPStatusCode int                    `json:",omitempty"`
135         Response       map[string]interface{} `json:",omitempty"`
136         ResponseTime   json.Number
137         ClockTime      time.Time
138         Server         string // "Server" header in http response
139         Metrics
140         respTime time.Duration
141 }
142
143 type Metrics struct {
144         ConfigSourceTimestamp time.Time
145         ConfigSourceSHA256    string
146         Version               string
147 }
148
149 type ServiceHealth struct {
150         Health string // "OK", "ERROR", or "SKIP"
151         N      int
152 }
153
154 func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
155         agg.setupOnce.Do(agg.setup)
156         resp := ClusterHealthResponse{
157                 Health:   "OK",
158                 Checks:   make(map[string]CheckResult),
159                 Services: make(map[arvados.ServiceName]ServiceHealth),
160         }
161
162         mtx := sync.Mutex{}
163         wg := sync.WaitGroup{}
164         for svcName, svc := range agg.Cluster.Services.Map() {
165                 // Ensure svc is listed in resp.Services.
166                 mtx.Lock()
167                 if _, ok := resp.Services[svcName]; !ok {
168                         resp.Services[svcName] = ServiceHealth{Health: "MISSING"}
169                 }
170                 mtx.Unlock()
171
172                 checkURLs := map[arvados.URL]bool{}
173                 for addr := range svc.InternalURLs {
174                         checkURLs[addr] = true
175                 }
176                 if len(checkURLs) == 0 && svc.ExternalURL.Host != "" {
177                         checkURLs[svc.ExternalURL] = true
178                 }
179                 for addr := range checkURLs {
180                         wg.Add(1)
181                         go func(svcName arvados.ServiceName, addr arvados.URL) {
182                                 defer wg.Done()
183                                 var result CheckResult
184                                 pingURL, err := agg.pingURL(addr)
185                                 if err != nil {
186                                         result = CheckResult{
187                                                 Health: "ERROR",
188                                                 Error:  err.Error(),
189                                         }
190                                 } else {
191                                         result = agg.ping(pingURL)
192                                         if result.Health != "SKIP" {
193                                                 m, err := agg.metrics(pingURL)
194                                                 if err != nil && result.Error == "" {
195                                                         result.Error = "metrics: " + err.Error()
196                                                 }
197                                                 result.Metrics = m
198                                         }
199                                 }
200
201                                 mtx.Lock()
202                                 defer mtx.Unlock()
203                                 resp.Checks[fmt.Sprintf("%s+%s", svcName, pingURL)] = result
204                                 if result.Health == "OK" || result.Health == "SKIP" {
205                                         h := resp.Services[svcName]
206                                         h.N++
207                                         if result.Health == "OK" || h.N == 1 {
208                                                 // "" => "SKIP" or "OK"
209                                                 // "SKIP" => "OK"
210                                                 h.Health = result.Health
211                                         }
212                                         resp.Services[svcName] = h
213                                 } else {
214                                         resp.Health = "ERROR"
215                                         resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: %s", svcName, result.Health, result.Error))
216                                 }
217                         }(svcName, addr)
218                 }
219         }
220         wg.Wait()
221
222         // Report ERROR if a needed service didn't fail any checks
223         // merely because it isn't configured to run anywhere.
224         for svcName, sh := range resp.Services {
225                 switch svcName {
226                 case arvados.ServiceNameDispatchCloud,
227                         arvados.ServiceNameDispatchLSF,
228                         arvados.ServiceNameDispatchSLURM:
229                         // ok to not run any given dispatcher
230                 case arvados.ServiceNameHealth,
231                         arvados.ServiceNameWorkbench1,
232                         arvados.ServiceNameWorkbench2:
233                         // typically doesn't have InternalURLs in config
234                 default:
235                         if sh.Health != "OK" && sh.Health != "SKIP" {
236                                 resp.Health = "ERROR"
237                                 resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: no InternalURLs configured", svcName, sh.Health))
238                                 continue
239                         }
240                 }
241         }
242
243         // Check for clock skew between hosts
244         var maxResponseTime time.Duration
245         var clockMin, clockMax time.Time
246         for _, result := range resp.Checks {
247                 if result.ClockTime.IsZero() {
248                         continue
249                 }
250                 if clockMin.IsZero() || result.ClockTime.Before(clockMin) {
251                         clockMin = result.ClockTime
252                 }
253                 if result.ClockTime.After(clockMax) {
254                         clockMax = result.ClockTime
255                 }
256                 if result.respTime > maxResponseTime {
257                         maxResponseTime = result.respTime
258                 }
259         }
260         skew := clockMax.Sub(clockMin)
261         resp.ClockSkew = arvados.Duration(skew)
262         if skew > maxClockSkew+maxResponseTime {
263                 msg := fmt.Sprintf("clock skew detected: maximum timestamp spread is %s (exceeds warning threshold of %s)", resp.ClockSkew, arvados.Duration(maxClockSkew))
264                 resp.Errors = append(resp.Errors, msg)
265                 resp.Health = "ERROR"
266         }
267         if agg.MetricClockSkew != nil {
268                 agg.MetricClockSkew.Set(skew.Seconds())
269         }
270
271         // Check for mismatched config files
272         var newest Metrics
273         for _, result := range resp.Checks {
274                 if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
275                         newest = result.Metrics
276                 }
277         }
278         var mismatches []string
279         for target, result := range resp.Checks {
280                 if hash := result.Metrics.ConfigSourceSHA256; hash != "" && hash != newest.ConfigSourceSHA256 {
281                         mismatches = append(mismatches, target)
282                 }
283         }
284         for _, target := range mismatches {
285                 msg := fmt.Sprintf("outdated config: %s: config file (sha256 %s) does not match latest version with timestamp %s",
286                         strings.TrimSuffix(target, "/_health/ping"),
287                         resp.Checks[target].Metrics.ConfigSourceSHA256,
288                         newest.ConfigSourceTimestamp.Format(time.RFC3339))
289                 resp.Errors = append(resp.Errors, msg)
290                 resp.Health = "ERROR"
291         }
292
293         // Check for services running a different version than we are.
294         for target, result := range resp.Checks {
295                 if result.Metrics.Version != "" && !sameVersion(result.Metrics.Version, cmd.Version.String()) {
296                         msg := fmt.Sprintf("version mismatch: %s is running %s -- expected %s",
297                                 strings.TrimSuffix(target, "/_health/ping"),
298                                 result.Metrics.Version,
299                                 cmd.Version.String())
300                         resp.Errors = append(resp.Errors, msg)
301                         resp.Health = "ERROR"
302                 }
303         }
304         return resp
305 }
306
307 func (agg *Aggregator) pingURL(svcURL arvados.URL) (*url.URL, error) {
308         base := url.URL(svcURL)
309         return base.Parse("/_health/ping")
310 }
311
312 func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
313         t0 := time.Now()
314         defer func() {
315                 result.respTime = time.Since(t0)
316                 result.ResponseTime = json.Number(fmt.Sprintf("%.6f", result.respTime.Seconds()))
317         }()
318         result.Health = "ERROR"
319
320         ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
321         defer cancel()
322         req, err := http.NewRequestWithContext(ctx, "GET", target.String(), nil)
323         if err != nil {
324                 result.Error = err.Error()
325                 return
326         }
327         req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
328
329         // Avoid workbench1's redirect-http-to-https feature
330         req.Header.Set("X-Forwarded-Proto", "https")
331
332         resp, err := agg.httpClient.Do(req)
333         if urlerr, ok := err.(*url.Error); ok {
334                 if neterr, ok := urlerr.Err.(*net.OpError); ok && isLocalHost(target.Hostname()) {
335                         result = CheckResult{
336                                 Health: "SKIP",
337                                 Error:  neterr.Error(),
338                         }
339                         err = nil
340                         return
341                 }
342         }
343         if err != nil {
344                 result.Error = err.Error()
345                 return
346         }
347         result.HTTPStatusCode = resp.StatusCode
348         err = json.NewDecoder(resp.Body).Decode(&result.Response)
349         if err != nil {
350                 result.Error = fmt.Sprintf("cannot decode response: %s", err)
351         } else if resp.StatusCode != http.StatusOK {
352                 result.Error = fmt.Sprintf("HTTP %d %s", resp.StatusCode, resp.Status)
353         } else if h, _ := result.Response["health"].(string); h != "OK" {
354                 if e, ok := result.Response["error"].(string); ok && e != "" {
355                         result.Error = e
356                         return
357                 } else {
358                         result.Error = fmt.Sprintf("health=%q in ping response", h)
359                         return
360                 }
361         }
362         result.Health = "OK"
363         result.ClockTime, _ = time.Parse(time.RFC1123, resp.Header.Get("Date"))
364         result.Server = resp.Header.Get("Server")
365         return
366 }
367
368 var (
369         reConfigMetric  = regexp.MustCompile(`arvados_config_source_timestamp_seconds{sha256="([0-9a-f]+)"} (\d[\d\.e\+]+)`)
370         reVersionMetric = regexp.MustCompile(`arvados_version_running{version="([^"]+)"} 1`)
371 )
372
373 func (agg *Aggregator) metrics(pingURL *url.URL) (result Metrics, err error) {
374         metricsURL, err := pingURL.Parse("/metrics")
375         if err != nil {
376                 return
377         }
378         ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
379         defer cancel()
380         req, err := http.NewRequestWithContext(ctx, "GET", metricsURL.String(), nil)
381         if err != nil {
382                 return
383         }
384         req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
385
386         // Avoid workbench1's redirect-http-to-https feature
387         req.Header.Set("X-Forwarded-Proto", "https")
388
389         resp, err := agg.httpClient.Do(req)
390         if err != nil {
391                 return
392         } else if resp.StatusCode != http.StatusOK {
393                 err = fmt.Errorf("%s: HTTP %d %s", metricsURL.String(), resp.StatusCode, resp.Status)
394                 return
395         }
396
397         scanner := bufio.NewScanner(resp.Body)
398         for scanner.Scan() {
399                 if m := reConfigMetric.FindSubmatch(scanner.Bytes()); len(m) == 3 && len(m[1]) > 0 {
400                         result.ConfigSourceSHA256 = string(m[1])
401                         unixtime, _ := strconv.ParseFloat(string(m[2]), 64)
402                         result.ConfigSourceTimestamp = time.UnixMicro(int64(unixtime * 1e6))
403                 } else if m = reVersionMetric.FindSubmatch(scanner.Bytes()); len(m) == 2 && len(m[1]) > 0 {
404                         result.Version = string(m[1])
405                 }
406         }
407         if err = scanner.Err(); err != nil {
408                 err = fmt.Errorf("error parsing response from %s: %w", metricsURL.String(), err)
409                 return
410         }
411         return
412 }
413
414 // Test whether host is an easily recognizable loopback address:
415 // 0.0.0.0, 127.x.x.x, ::1, or localhost.
416 func isLocalHost(host string) bool {
417         ip := net.ParseIP(host)
418         return ip.IsLoopback() || bytes.Equal(ip.To4(), []byte{0, 0, 0, 0}) || strings.EqualFold(host, "localhost")
419 }
420
421 func (agg *Aggregator) checkAuth(req *http.Request) bool {
422         creds := auth.CredentialsFromRequest(req)
423         for _, token := range creds.Tokens {
424                 if token != "" && token == agg.Cluster.ManagementToken {
425                         return true
426                 }
427         }
428         return false
429 }
430
431 var errSilent = errors.New("")
432
433 var CheckCommand cmd.Handler = checkCommand{}
434
435 type checkCommand struct{}
436
437 func (ccmd checkCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
438         logger := ctxlog.New(stderr, "json", "info")
439         ctx := ctxlog.Context(context.Background(), logger)
440         err := ccmd.run(ctx, prog, args, stdin, stdout, stderr)
441         if err != nil {
442                 if err != errSilent {
443                         fmt.Fprintln(stderr, err.Error())
444                 }
445                 return 1
446         }
447         return 0
448 }
449
450 func (ccmd checkCommand) run(ctx context.Context, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) error {
451         flags := flag.NewFlagSet("", flag.ContinueOnError)
452         flags.SetOutput(stderr)
453         loader := config.NewLoader(stdin, ctxlog.New(stderr, "text", "info"))
454         loader.SetupFlags(flags)
455         versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
456         timeout := flags.Duration("timeout", defaultTimeout.Duration(), "Maximum time to wait for health responses")
457         quiet := flags.Bool("quiet", false, "Silent on success (suppress 'health check OK' message on stderr)")
458         outputYAML := flags.Bool("yaml", false, "Output full health report in YAML format (default mode prints 'health check OK' or plain text errors)")
459         if ok, _ := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
460                 // cmd.ParseFlags already reported the error
461                 return errSilent
462         } else if *versionFlag {
463                 cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
464                 return nil
465         }
466         cfg, err := loader.Load()
467         if err != nil {
468                 return err
469         }
470         cluster, err := cfg.GetCluster("")
471         if err != nil {
472                 return err
473         }
474         logger := ctxlog.New(stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel).WithFields(logrus.Fields{
475                 "ClusterID": cluster.ClusterID,
476         })
477         ctx = ctxlog.Context(ctx, logger)
478         agg := Aggregator{Cluster: cluster, timeout: arvados.Duration(*timeout)}
479         resp := agg.ClusterHealth()
480         if *outputYAML {
481                 y, err := yaml.Marshal(resp)
482                 if err != nil {
483                         return err
484                 }
485                 stdout.Write(y)
486                 if resp.Health != "OK" {
487                         return errSilent
488                 }
489                 return nil
490         }
491         if resp.Health != "OK" {
492                 for _, msg := range resp.Errors {
493                         fmt.Fprintln(stderr, msg)
494                 }
495                 fmt.Fprintln(stderr, "health check failed")
496                 return errSilent
497         }
498         if !*quiet {
499                 fmt.Fprintln(stderr, "health check OK")
500         }
501         return nil
502 }
503
504 var reGoVersion = regexp.MustCompile(` \(go\d+([\d.])*\)$`)
505
506 // Return true if either a==b or the only difference is that one has a
507 // " (go1.2.3)" suffix and the other does not.
508 //
509 // This allows us to recognize a non-Go (rails) service as the same
510 // version as a Go service.
511 func sameVersion(a, b string) bool {
512         if a == b {
513                 return true
514         }
515         anogo := reGoVersion.ReplaceAllLiteralString(a, "")
516         bnogo := reGoVersion.ReplaceAllLiteralString(b, "")
517         if (anogo == a) != (bnogo == b) {
518                 // only one of a/b has a (go1.2.3) suffix, so compare
519                 // without that part
520                 return anogo == bnogo
521         }
522         // both or neither has a (go1.2.3) suffix, and we already know
523         // a!=b
524         return false
525 }