X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fe6606652dfe274627eadcef902d6e30d2856440..10db2d51a25a80840f299f09baada5588af991f3:/sdk/go/health/aggregator.go diff --git a/sdk/go/health/aggregator.go b/sdk/go/health/aggregator.go index 249df5e23b..f473eff353 100644 --- a/sdk/go/health/aggregator.go +++ b/sdk/go/health/aggregator.go @@ -6,6 +6,7 @@ package health import ( "bufio" + "bytes" "context" "crypto/tls" "encoding/json" @@ -13,6 +14,7 @@ import ( "flag" "fmt" "io" + "net" "net/http" "net/url" "regexp" @@ -27,10 +29,14 @@ import ( "git.arvados.org/arvados.git/sdk/go/auth" "git.arvados.org/arvados.git/sdk/go/ctxlog" "github.com/ghodss/yaml" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) -const defaultTimeout = arvados.Duration(2 * time.Second) +const ( + defaultTimeout = arvados.Duration(2 * time.Second) + maxClockSkew = time.Minute +) // Aggregator implements service.Handler. It handles "GET /_health/all" // by checking the health of all configured services on the cluster @@ -44,6 +50,9 @@ type Aggregator struct { // If non-nil, Log is called after handling each request. Log func(*http.Request, error) + + // If non-nil, report clock skew on each health-check. + MetricClockSkew prometheus.Gauge } func (agg *Aggregator) setup() { @@ -112,6 +121,10 @@ type ClusterHealthResponse struct { // anywhere." Services map[arvados.ServiceName]ServiceHealth `json:"services"` + // Difference between min/max timestamps in individual + // health-check responses. + ClockSkew arvados.Duration + Errors []string `json:"errors"` } @@ -122,7 +135,9 @@ type CheckResult struct { HTTPStatusText string `json:",omitempty"` Response map[string]interface{} `json:"response"` ResponseTime json.Number `json:"responseTime"` + ClockTime time.Time `json:"clockTime"` Metrics Metrics `json:"-"` + respTime time.Duration } type Metrics struct { @@ -131,7 +146,7 @@ type Metrics struct { } type ServiceHealth struct { - Health string `json:"health"` + Health string `json:"health"` // "OK", "ERROR", or "SKIP" N int `json:"n"` } @@ -149,7 +164,7 @@ func (agg *Aggregator) ClusterHealth() ClusterHealthResponse { // Ensure svc is listed in resp.Services. mtx.Lock() if _, ok := resp.Services[svcName]; !ok { - resp.Services[svcName] = ServiceHealth{Health: "ERROR"} + resp.Services[svcName] = ServiceHealth{Health: "MISSING"} } mtx.Unlock() @@ -173,23 +188,30 @@ func (agg *Aggregator) ClusterHealth() ClusterHealthResponse { } } else { result = agg.ping(pingURL) - m, err := agg.metrics(pingURL) - if err != nil { - result.Error = "metrics: " + err.Error() + if result.Health != "SKIP" { + m, err := agg.metrics(pingURL) + if err != nil && result.Error == "" { + result.Error = "metrics: " + err.Error() + } + result.Metrics = m } - result.Metrics = m } mtx.Lock() defer mtx.Unlock() resp.Checks[fmt.Sprintf("%s+%s", svcName, pingURL)] = result - if result.Health == "OK" { + if result.Health == "OK" || result.Health == "SKIP" { h := resp.Services[svcName] h.N++ - h.Health = "OK" + if result.Health == "OK" || h.N == 1 { + // "" => "SKIP" or "OK" + // "SKIP" => "OK" + h.Health = result.Health + } resp.Services[svcName] = h } else { resp.Health = "ERROR" + resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: %s", svcName, result.Health, result.Error)) } }(svcName, addr) } @@ -198,13 +220,51 @@ func (agg *Aggregator) ClusterHealth() ClusterHealthResponse { // Report ERROR if a needed service didn't fail any checks // merely because it isn't configured to run anywhere. - for _, sh := range resp.Services { - if sh.Health != "OK" { - resp.Health = "ERROR" - break + for svcName, sh := range resp.Services { + switch svcName { + case arvados.ServiceNameDispatchCloud, + arvados.ServiceNameDispatchLSF: + // ok to not run any given dispatcher + case arvados.ServiceNameHealth, + arvados.ServiceNameWorkbench1, + arvados.ServiceNameWorkbench2: + // typically doesn't have InternalURLs in config + default: + if sh.Health != "OK" && sh.Health != "SKIP" { + resp.Health = "ERROR" + resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: no InternalURLs configured", svcName, sh.Health)) + continue + } } } + var maxResponseTime time.Duration + var clockMin, clockMax time.Time + for _, result := range resp.Checks { + if result.ClockTime.IsZero() { + continue + } + if clockMin.IsZero() || result.ClockTime.Before(clockMin) { + clockMin = result.ClockTime + } + if result.ClockTime.After(clockMax) { + clockMax = result.ClockTime + } + if result.respTime > maxResponseTime { + maxResponseTime = result.respTime + } + } + skew := clockMax.Sub(clockMin) + resp.ClockSkew = arvados.Duration(skew) + if skew > maxClockSkew+maxResponseTime { + msg := fmt.Sprintf("clock skew detected: maximum timestamp spread is %s (exceeds warning threshold of %s)", resp.ClockSkew, arvados.Duration(maxClockSkew)) + resp.Errors = append(resp.Errors, msg) + resp.Health = "ERROR" + } + if agg.MetricClockSkew != nil { + agg.MetricClockSkew.Set(skew.Seconds()) + } + var newest Metrics for _, result := range resp.Checks { if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) { @@ -236,7 +296,8 @@ func (agg *Aggregator) pingURL(svcURL arvados.URL) (*url.URL, error) { func (agg *Aggregator) ping(target *url.URL) (result CheckResult) { t0 := time.Now() defer func() { - result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds())) + result.respTime = time.Since(t0) + result.ResponseTime = json.Number(fmt.Sprintf("%.6f", result.respTime.Seconds())) }() result.Health = "ERROR" @@ -253,6 +314,16 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) { req.Header.Set("X-Forwarded-Proto", "https") resp, err := agg.httpClient.Do(req) + if urlerr, ok := err.(*url.Error); ok { + if neterr, ok := urlerr.Err.(*net.OpError); ok && isLocalHost(target.Hostname()) { + result = CheckResult{ + Health: "SKIP", + Error: neterr.Error(), + } + err = nil + return + } + } if err != nil { result.Error = err.Error() return @@ -274,6 +345,7 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) { } } result.Health = "OK" + result.ClockTime, _ = time.Parse(time.RFC1123, resp.Header.Get("Date")) return } @@ -320,6 +392,13 @@ func (agg *Aggregator) metrics(pingURL *url.URL) (result Metrics, err error) { return } +// Test whether host is an easily recognizable loopback address: +// 0.0.0.0, 127.x.x.x, ::1, or localhost. +func isLocalHost(host string) bool { + ip := net.ParseIP(host) + return ip.IsLoopback() || bytes.Equal(ip.To4(), []byte{0, 0, 0, 0}) || strings.EqualFold(host, "localhost") +} + func (agg *Aggregator) checkAuth(req *http.Request) bool { creds := auth.CredentialsFromRequest(req) for _, token := range creds.Tokens { @@ -356,6 +435,7 @@ func (ccmd checkCommand) run(ctx context.Context, prog string, args []string, st loader.SetupFlags(flags) versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0") timeout := flags.Duration("timeout", defaultTimeout.Duration(), "Maximum time to wait for health responses") + outputYAML := flags.Bool("yaml", false, "Output full health report in YAML format (default mode shows errors as plain text, is silent on success)") if ok, _ := cmd.ParseFlags(flags, prog, args, "", stderr); !ok { // cmd.ParseFlags already reported the error return errSilent @@ -377,13 +457,23 @@ func (ccmd checkCommand) run(ctx context.Context, prog string, args []string, st ctx = ctxlog.Context(ctx, logger) agg := Aggregator{Cluster: cluster, timeout: arvados.Duration(*timeout)} resp := agg.ClusterHealth() - buf, err := yaml.Marshal(resp) - if err != nil { - return err + if *outputYAML { + y, err := yaml.Marshal(resp) + if err != nil { + return err + } + stdout.Write(y) + if resp.Health != "OK" { + return errSilent + } + return nil } - stdout.Write(buf) if resp.Health != "OK" { - return fmt.Errorf("health check failed") + for _, msg := range resp.Errors { + fmt.Fprintln(stdout, msg) + } + fmt.Fprintln(stderr, "health check failed") + return errSilent } return nil }