import (
"bufio"
+ "bytes"
"context"
"crypto/tls"
"encoding/json"
"flag"
"fmt"
"io"
+ "net"
"net/http"
"net/url"
"regexp"
"git.arvados.org/arvados.git/sdk/go/auth"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
-const defaultTimeout = arvados.Duration(2 * time.Second)
+const (
+ defaultTimeout = arvados.Duration(2 * time.Second)
+ maxClockSkew = time.Minute
+)
// Aggregator implements service.Handler. It handles "GET /_health/all"
// by checking the health of all configured services on the cluster
// If non-nil, Log is called after handling each request.
Log func(*http.Request, error)
+
+ // If non-nil, report clock skew on each health-check.
+ MetricClockSkew prometheus.Gauge
}
func (agg *Aggregator) setup() {
// anywhere."
Services map[arvados.ServiceName]ServiceHealth `json:"services"`
+ // Difference between min/max timestamps in individual
+ // health-check responses.
+ ClockSkew arvados.Duration
+
Errors []string `json:"errors"`
}
HTTPStatusText string `json:",omitempty"`
Response map[string]interface{} `json:"response"`
ResponseTime json.Number `json:"responseTime"`
+ ClockTime time.Time `json:"clockTime"`
Metrics Metrics `json:"-"`
+ respTime time.Duration
}
type Metrics struct {
}
type ServiceHealth struct {
- Health string `json:"health"`
+ Health string `json:"health"` // "OK", "ERROR", or "SKIP"
N int `json:"n"`
}
// Ensure svc is listed in resp.Services.
mtx.Lock()
if _, ok := resp.Services[svcName]; !ok {
- resp.Services[svcName] = ServiceHealth{Health: "ERROR"}
+ resp.Services[svcName] = ServiceHealth{Health: "MISSING"}
}
mtx.Unlock()
}
} else {
result = agg.ping(pingURL)
- m, err := agg.metrics(pingURL)
- if err != nil {
- result.Error = "metrics: " + err.Error()
+ if result.Health != "SKIP" {
+ m, err := agg.metrics(pingURL)
+ if err != nil && result.Error == "" {
+ result.Error = "metrics: " + err.Error()
+ }
+ result.Metrics = m
}
- result.Metrics = m
}
mtx.Lock()
defer mtx.Unlock()
resp.Checks[fmt.Sprintf("%s+%s", svcName, pingURL)] = result
- if result.Health == "OK" {
+ if result.Health == "OK" || result.Health == "SKIP" {
h := resp.Services[svcName]
h.N++
- h.Health = "OK"
+ if result.Health == "OK" || h.N == 1 {
+ // "" => "SKIP" or "OK"
+ // "SKIP" => "OK"
+ h.Health = result.Health
+ }
resp.Services[svcName] = h
} else {
resp.Health = "ERROR"
+ resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: %s", svcName, result.Health, result.Error))
}
}(svcName, addr)
}
// Report ERROR if a needed service didn't fail any checks
// merely because it isn't configured to run anywhere.
- for _, sh := range resp.Services {
- if sh.Health != "OK" {
- resp.Health = "ERROR"
- break
+ for svcName, sh := range resp.Services {
+ switch svcName {
+ case arvados.ServiceNameDispatchCloud,
+ arvados.ServiceNameDispatchLSF:
+ // ok to not run any given dispatcher
+ case arvados.ServiceNameHealth,
+ arvados.ServiceNameWorkbench1,
+ arvados.ServiceNameWorkbench2:
+ // typically doesn't have InternalURLs in config
+ default:
+ if sh.Health != "OK" && sh.Health != "SKIP" {
+ resp.Health = "ERROR"
+ resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: no InternalURLs configured", svcName, sh.Health))
+ continue
+ }
}
}
+ var maxResponseTime time.Duration
+ var clockMin, clockMax time.Time
+ for _, result := range resp.Checks {
+ if result.ClockTime.IsZero() {
+ continue
+ }
+ if clockMin.IsZero() || result.ClockTime.Before(clockMin) {
+ clockMin = result.ClockTime
+ }
+ if result.ClockTime.After(clockMax) {
+ clockMax = result.ClockTime
+ }
+ if result.respTime > maxResponseTime {
+ maxResponseTime = result.respTime
+ }
+ }
+ skew := clockMax.Sub(clockMin)
+ resp.ClockSkew = arvados.Duration(skew)
+ if skew > maxClockSkew+maxResponseTime {
+ msg := fmt.Sprintf("clock skew detected: maximum timestamp spread is %s (exceeds warning threshold of %s)", resp.ClockSkew, arvados.Duration(maxClockSkew))
+ resp.Errors = append(resp.Errors, msg)
+ resp.Health = "ERROR"
+ }
+ if agg.MetricClockSkew != nil {
+ agg.MetricClockSkew.Set(skew.Seconds())
+ }
+
var newest Metrics
for _, result := range resp.Checks {
if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
t0 := time.Now()
defer func() {
- result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
+ result.respTime = time.Since(t0)
+ result.ResponseTime = json.Number(fmt.Sprintf("%.6f", result.respTime.Seconds()))
}()
result.Health = "ERROR"
req.Header.Set("X-Forwarded-Proto", "https")
resp, err := agg.httpClient.Do(req)
+ if urlerr, ok := err.(*url.Error); ok {
+ if neterr, ok := urlerr.Err.(*net.OpError); ok && isLocalHost(target.Hostname()) {
+ result = CheckResult{
+ Health: "SKIP",
+ Error: neterr.Error(),
+ }
+ err = nil
+ return
+ }
+ }
if err != nil {
result.Error = err.Error()
return
}
}
result.Health = "OK"
+ result.ClockTime, _ = time.Parse(time.RFC1123, resp.Header.Get("Date"))
return
}
return
}
+// Test whether host is an easily recognizable loopback address:
+// 0.0.0.0, 127.x.x.x, ::1, or localhost.
+func isLocalHost(host string) bool {
+ ip := net.ParseIP(host)
+ return ip.IsLoopback() || bytes.Equal(ip.To4(), []byte{0, 0, 0, 0}) || strings.EqualFold(host, "localhost")
+}
+
func (agg *Aggregator) checkAuth(req *http.Request) bool {
creds := auth.CredentialsFromRequest(req)
for _, token := range creds.Tokens {
loader.SetupFlags(flags)
versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
timeout := flags.Duration("timeout", defaultTimeout.Duration(), "Maximum time to wait for health responses")
+ outputYAML := flags.Bool("yaml", false, "Output full health report in YAML format (default mode shows errors as plain text, is silent on success)")
if ok, _ := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
// cmd.ParseFlags already reported the error
return errSilent
ctx = ctxlog.Context(ctx, logger)
agg := Aggregator{Cluster: cluster, timeout: arvados.Duration(*timeout)}
resp := agg.ClusterHealth()
- buf, err := yaml.Marshal(resp)
- if err != nil {
- return err
+ if *outputYAML {
+ y, err := yaml.Marshal(resp)
+ if err != nil {
+ return err
+ }
+ stdout.Write(y)
+ if resp.Health != "OK" {
+ return errSilent
+ }
+ return nil
}
- stdout.Write(buf)
if resp.Health != "OK" {
- return fmt.Errorf("health check failed")
+ for _, msg := range resp.Errors {
+ fmt.Fprintln(stdout, msg)
+ }
+ fmt.Fprintln(stderr, "health check failed")
+ return errSilent
}
return nil
}