16345: "arvados-server check" detects clock skew.
[arvados.git] / sdk / go / health / aggregator.go
index 249df5e23b5e74904d061a3bb63fbad2f2e2b2f6..f473eff3537bba9186d41363d132fd980d94c382 100644 (file)
@@ -6,6 +6,7 @@ package health
 
 import (
        "bufio"
+       "bytes"
        "context"
        "crypto/tls"
        "encoding/json"
@@ -13,6 +14,7 @@ import (
        "flag"
        "fmt"
        "io"
+       "net"
        "net/http"
        "net/url"
        "regexp"
@@ -27,10 +29,14 @@ import (
        "git.arvados.org/arvados.git/sdk/go/auth"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/ghodss/yaml"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
 
-const defaultTimeout = arvados.Duration(2 * time.Second)
+const (
+       defaultTimeout = arvados.Duration(2 * time.Second)
+       maxClockSkew   = time.Minute
+)
 
 // Aggregator implements service.Handler. It handles "GET /_health/all"
 // by checking the health of all configured services on the cluster
@@ -44,6 +50,9 @@ type Aggregator struct {
 
        // If non-nil, Log is called after handling each request.
        Log func(*http.Request, error)
+
+       // If non-nil, report clock skew on each health-check.
+       MetricClockSkew prometheus.Gauge
 }
 
 func (agg *Aggregator) setup() {
@@ -112,6 +121,10 @@ type ClusterHealthResponse struct {
        // anywhere."
        Services map[arvados.ServiceName]ServiceHealth `json:"services"`
 
+       // Difference between min/max timestamps in individual
+       // health-check responses.
+       ClockSkew arvados.Duration
+
        Errors []string `json:"errors"`
 }
 
@@ -122,7 +135,9 @@ type CheckResult struct {
        HTTPStatusText string                 `json:",omitempty"`
        Response       map[string]interface{} `json:"response"`
        ResponseTime   json.Number            `json:"responseTime"`
+       ClockTime      time.Time              `json:"clockTime"`
        Metrics        Metrics                `json:"-"`
+       respTime       time.Duration
 }
 
 type Metrics struct {
@@ -131,7 +146,7 @@ type Metrics struct {
 }
 
 type ServiceHealth struct {
-       Health string `json:"health"`
+       Health string `json:"health"` // "OK", "ERROR", or "SKIP"
        N      int    `json:"n"`
 }
 
@@ -149,7 +164,7 @@ func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
                // Ensure svc is listed in resp.Services.
                mtx.Lock()
                if _, ok := resp.Services[svcName]; !ok {
-                       resp.Services[svcName] = ServiceHealth{Health: "ERROR"}
+                       resp.Services[svcName] = ServiceHealth{Health: "MISSING"}
                }
                mtx.Unlock()
 
@@ -173,23 +188,30 @@ func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
                                        }
                                } else {
                                        result = agg.ping(pingURL)
-                                       m, err := agg.metrics(pingURL)
-                                       if err != nil {
-                                               result.Error = "metrics: " + err.Error()
+                                       if result.Health != "SKIP" {
+                                               m, err := agg.metrics(pingURL)
+                                               if err != nil && result.Error == "" {
+                                                       result.Error = "metrics: " + err.Error()
+                                               }
+                                               result.Metrics = m
                                        }
-                                       result.Metrics = m
                                }
 
                                mtx.Lock()
                                defer mtx.Unlock()
                                resp.Checks[fmt.Sprintf("%s+%s", svcName, pingURL)] = result
-                               if result.Health == "OK" {
+                               if result.Health == "OK" || result.Health == "SKIP" {
                                        h := resp.Services[svcName]
                                        h.N++
-                                       h.Health = "OK"
+                                       if result.Health == "OK" || h.N == 1 {
+                                               // "" => "SKIP" or "OK"
+                                               // "SKIP" => "OK"
+                                               h.Health = result.Health
+                                       }
                                        resp.Services[svcName] = h
                                } else {
                                        resp.Health = "ERROR"
+                                       resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: %s", svcName, result.Health, result.Error))
                                }
                        }(svcName, addr)
                }
@@ -198,13 +220,51 @@ func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
 
        // Report ERROR if a needed service didn't fail any checks
        // merely because it isn't configured to run anywhere.
-       for _, sh := range resp.Services {
-               if sh.Health != "OK" {
-                       resp.Health = "ERROR"
-                       break
+       for svcName, sh := range resp.Services {
+               switch svcName {
+               case arvados.ServiceNameDispatchCloud,
+                       arvados.ServiceNameDispatchLSF:
+                       // ok to not run any given dispatcher
+               case arvados.ServiceNameHealth,
+                       arvados.ServiceNameWorkbench1,
+                       arvados.ServiceNameWorkbench2:
+                       // typically doesn't have InternalURLs in config
+               default:
+                       if sh.Health != "OK" && sh.Health != "SKIP" {
+                               resp.Health = "ERROR"
+                               resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: no InternalURLs configured", svcName, sh.Health))
+                               continue
+                       }
                }
        }
 
+       var maxResponseTime time.Duration
+       var clockMin, clockMax time.Time
+       for _, result := range resp.Checks {
+               if result.ClockTime.IsZero() {
+                       continue
+               }
+               if clockMin.IsZero() || result.ClockTime.Before(clockMin) {
+                       clockMin = result.ClockTime
+               }
+               if result.ClockTime.After(clockMax) {
+                       clockMax = result.ClockTime
+               }
+               if result.respTime > maxResponseTime {
+                       maxResponseTime = result.respTime
+               }
+       }
+       skew := clockMax.Sub(clockMin)
+       resp.ClockSkew = arvados.Duration(skew)
+       if skew > maxClockSkew+maxResponseTime {
+               msg := fmt.Sprintf("clock skew detected: maximum timestamp spread is %s (exceeds warning threshold of %s)", resp.ClockSkew, arvados.Duration(maxClockSkew))
+               resp.Errors = append(resp.Errors, msg)
+               resp.Health = "ERROR"
+       }
+       if agg.MetricClockSkew != nil {
+               agg.MetricClockSkew.Set(skew.Seconds())
+       }
+
        var newest Metrics
        for _, result := range resp.Checks {
                if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
@@ -236,7 +296,8 @@ func (agg *Aggregator) pingURL(svcURL arvados.URL) (*url.URL, error) {
 func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
        t0 := time.Now()
        defer func() {
-               result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
+               result.respTime = time.Since(t0)
+               result.ResponseTime = json.Number(fmt.Sprintf("%.6f", result.respTime.Seconds()))
        }()
        result.Health = "ERROR"
 
@@ -253,6 +314,16 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
        req.Header.Set("X-Forwarded-Proto", "https")
 
        resp, err := agg.httpClient.Do(req)
+       if urlerr, ok := err.(*url.Error); ok {
+               if neterr, ok := urlerr.Err.(*net.OpError); ok && isLocalHost(target.Hostname()) {
+                       result = CheckResult{
+                               Health: "SKIP",
+                               Error:  neterr.Error(),
+                       }
+                       err = nil
+                       return
+               }
+       }
        if err != nil {
                result.Error = err.Error()
                return
@@ -274,6 +345,7 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
                }
        }
        result.Health = "OK"
+       result.ClockTime, _ = time.Parse(time.RFC1123, resp.Header.Get("Date"))
        return
 }
 
@@ -320,6 +392,13 @@ func (agg *Aggregator) metrics(pingURL *url.URL) (result Metrics, err error) {
        return
 }
 
+// Test whether host is an easily recognizable loopback address:
+// 0.0.0.0, 127.x.x.x, ::1, or localhost.
+func isLocalHost(host string) bool {
+       ip := net.ParseIP(host)
+       return ip.IsLoopback() || bytes.Equal(ip.To4(), []byte{0, 0, 0, 0}) || strings.EqualFold(host, "localhost")
+}
+
 func (agg *Aggregator) checkAuth(req *http.Request) bool {
        creds := auth.CredentialsFromRequest(req)
        for _, token := range creds.Tokens {
@@ -356,6 +435,7 @@ func (ccmd checkCommand) run(ctx context.Context, prog string, args []string, st
        loader.SetupFlags(flags)
        versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
        timeout := flags.Duration("timeout", defaultTimeout.Duration(), "Maximum time to wait for health responses")
+       outputYAML := flags.Bool("yaml", false, "Output full health report in YAML format (default mode shows errors as plain text, is silent on success)")
        if ok, _ := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
                // cmd.ParseFlags already reported the error
                return errSilent
@@ -377,13 +457,23 @@ func (ccmd checkCommand) run(ctx context.Context, prog string, args []string, st
        ctx = ctxlog.Context(ctx, logger)
        agg := Aggregator{Cluster: cluster, timeout: arvados.Duration(*timeout)}
        resp := agg.ClusterHealth()
-       buf, err := yaml.Marshal(resp)
-       if err != nil {
-               return err
+       if *outputYAML {
+               y, err := yaml.Marshal(resp)
+               if err != nil {
+                       return err
+               }
+               stdout.Write(y)
+               if resp.Health != "OK" {
+                       return errSilent
+               }
+               return nil
        }
-       stdout.Write(buf)
        if resp.Health != "OK" {
-               return fmt.Errorf("health check failed")
+               for _, msg := range resp.Errors {
+                       fmt.Fprintln(stdout, msg)
+               }
+               fmt.Fprintln(stderr, "health check failed")
+               return errSilent
        }
        return nil
 }