package health
import (
+ "bufio"
+ "bytes"
"context"
+ "crypto/tls"
"encoding/json"
"errors"
+ "flag"
"fmt"
+ "io"
"net"
"net/http"
+ "net/url"
+ "regexp"
+ "strconv"
+ "strings"
"sync"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
-const defaultTimeout = arvados.Duration(2 * time.Second)
+const (
+ defaultTimeout = arvados.Duration(2 * time.Second)
+ maxClockSkew = time.Minute
+)
-// Aggregator implements http.Handler. It handles "GET /_health/all"
+// Aggregator implements service.Handler. It handles "GET /_health/all"
// by checking the health of all configured services on the cluster
// and responding 200 if everything is healthy.
type Aggregator struct {
httpClient *http.Client
timeout arvados.Duration
- Config *arvados.Config
+ Cluster *arvados.Cluster
// If non-nil, Log is called after handling each request.
Log func(*http.Request, error)
+
+ // If non-nil, report clock skew on each health-check.
+ MetricClockSkew prometheus.Gauge
}
func (agg *Aggregator) setup() {
- agg.httpClient = http.DefaultClient
+ agg.httpClient = &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: agg.Cluster.TLS.Insecure,
+ },
+ },
+ }
if agg.timeout == 0 {
// this is always the case, except in the test suite
agg.timeout = defaultTimeout
}
}
+func (agg *Aggregator) CheckHealth() error {
+ return nil
+}
+
+func (agg *Aggregator) Done() <-chan struct{} {
+ return nil
+}
+
func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
agg.setupOnce.Do(agg.setup)
sendErr := func(statusCode int, err error) {
resp.Header().Set("Content-Type", "application/json")
- cluster, err := agg.Config.GetCluster("")
- if err != nil {
- err = fmt.Errorf("arvados.GetCluster(): %s", err)
- sendErr(http.StatusInternalServerError, err)
- return
- }
- if !agg.checkAuth(req, cluster) {
+ if !agg.checkAuth(req) {
sendErr(http.StatusUnauthorized, errUnauthorized)
return
}
- if req.URL.Path != "/_health/all" {
+ if req.URL.Path == "/_health/all" {
+ json.NewEncoder(resp).Encode(agg.ClusterHealth())
+ } else if req.URL.Path == "/_health/ping" {
+ resp.Write(healthyBody)
+ } else {
sendErr(http.StatusNotFound, errNotFound)
return
}
- json.NewEncoder(resp).Encode(agg.ClusterHealth(cluster))
if agg.Log != nil {
agg.Log(req, nil)
}
type ClusterHealthResponse struct {
// "OK" if all needed services are OK, otherwise "ERROR".
- Health string `json:"health"`
+ Health string
// An entry for each known health check of each known instance
// of each needed component: "instance of service S on node N
// reports health-check C is OK."
- Checks map[string]CheckResult `json:"checks"`
+ Checks map[string]CheckResult
// An entry for each service type: "service S is OK." This
// exposes problems that can't be expressed in Checks, like
// "service S is needed, but isn't configured to run
// anywhere."
- Services map[arvados.ServiceName]ServiceHealth `json:"services"`
+ Services map[arvados.ServiceName]ServiceHealth
+
+ // Difference between min/max timestamps in individual
+ // health-check responses.
+ ClockSkew arvados.Duration
+
+ Errors []string
}
type CheckResult struct {
- Health string `json:"health"`
- Error string `json:"error,omitempty"`
+ Health string
+ Error string `json:",omitempty"`
HTTPStatusCode int `json:",omitempty"`
- HTTPStatusText string `json:",omitempty"`
- Response map[string]interface{} `json:"response"`
- ResponseTime json.Number `json:"responseTime"`
+ Response map[string]interface{} `json:",omitempty"`
+ ResponseTime json.Number
+ ClockTime time.Time
+ Metrics
+ respTime time.Duration
+}
+
+type Metrics struct {
+ ConfigSourceTimestamp time.Time
+ ConfigSourceSHA256 string
+ Version string
}
type ServiceHealth struct {
- Health string `json:"health"`
- N int `json:"n"`
+ Health string // "OK", "ERROR", or "SKIP"
+ N int
}
-func (agg *Aggregator) ClusterHealth(cluster *arvados.Cluster) ClusterHealthResponse {
+func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
+ agg.setupOnce.Do(agg.setup)
resp := ClusterHealthResponse{
Health: "OK",
Checks: make(map[string]CheckResult),
mtx := sync.Mutex{}
wg := sync.WaitGroup{}
- for profileName, profile := range cluster.NodeProfiles {
- for svc, addr := range profile.ServicePorts() {
- // Ensure svc is listed in resp.Services.
- mtx.Lock()
- if _, ok := resp.Services[svc]; !ok {
- resp.Services[svc] = ServiceHealth{Health: "ERROR"}
- }
- mtx.Unlock()
-
- if addr == "" {
- // svc is not expected on this node.
- continue
- }
+ for svcName, svc := range agg.Cluster.Services.Map() {
+ // Ensure svc is listed in resp.Services.
+ mtx.Lock()
+ if _, ok := resp.Services[svcName]; !ok {
+ resp.Services[svcName] = ServiceHealth{Health: "MISSING"}
+ }
+ mtx.Unlock()
+ checkURLs := map[arvados.URL]bool{}
+ for addr := range svc.InternalURLs {
+ checkURLs[addr] = true
+ }
+ if len(checkURLs) == 0 && svc.ExternalURL.Host != "" {
+ checkURLs[svc.ExternalURL] = true
+ }
+ for addr := range checkURLs {
wg.Add(1)
- go func(profileName string, svc arvados.ServiceName, addr string) {
+ go func(svcName arvados.ServiceName, addr arvados.URL) {
defer wg.Done()
var result CheckResult
- url, err := agg.pingURL(profileName, addr)
+ pingURL, err := agg.pingURL(addr)
if err != nil {
result = CheckResult{
Health: "ERROR",
Error: err.Error(),
}
} else {
- result = agg.ping(url, cluster)
+ result = agg.ping(pingURL)
+ if result.Health != "SKIP" {
+ m, err := agg.metrics(pingURL)
+ if err != nil && result.Error == "" {
+ result.Error = "metrics: " + err.Error()
+ }
+ result.Metrics = m
+ }
}
mtx.Lock()
defer mtx.Unlock()
- resp.Checks[fmt.Sprintf("%s+%s", svc, url)] = result
- if result.Health == "OK" {
- h := resp.Services[svc]
+ resp.Checks[fmt.Sprintf("%s+%s", svcName, pingURL)] = result
+ if result.Health == "OK" || result.Health == "SKIP" {
+ h := resp.Services[svcName]
h.N++
- h.Health = "OK"
- resp.Services[svc] = h
+ if result.Health == "OK" || h.N == 1 {
+ // "" => "SKIP" or "OK"
+ // "SKIP" => "OK"
+ h.Health = result.Health
+ }
+ resp.Services[svcName] = h
} else {
resp.Health = "ERROR"
+ resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: %s", svcName, result.Health, result.Error))
}
- }(profileName, svc, addr)
+ }(svcName, addr)
}
}
wg.Wait()
// Report ERROR if a needed service didn't fail any checks
// merely because it isn't configured to run anywhere.
- for _, sh := range resp.Services {
- if sh.Health != "OK" {
+ for svcName, sh := range resp.Services {
+ switch svcName {
+ case arvados.ServiceNameDispatchCloud,
+ arvados.ServiceNameDispatchLSF:
+ // ok to not run any given dispatcher
+ case arvados.ServiceNameHealth,
+ arvados.ServiceNameWorkbench1,
+ arvados.ServiceNameWorkbench2:
+ // typically doesn't have InternalURLs in config
+ default:
+ if sh.Health != "OK" && sh.Health != "SKIP" {
+ resp.Health = "ERROR"
+ resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: no InternalURLs configured", svcName, sh.Health))
+ continue
+ }
+ }
+ }
+
+ // Check for clock skew between hosts
+ var maxResponseTime time.Duration
+ var clockMin, clockMax time.Time
+ for _, result := range resp.Checks {
+ if result.ClockTime.IsZero() {
+ continue
+ }
+ if clockMin.IsZero() || result.ClockTime.Before(clockMin) {
+ clockMin = result.ClockTime
+ }
+ if result.ClockTime.After(clockMax) {
+ clockMax = result.ClockTime
+ }
+ if result.respTime > maxResponseTime {
+ maxResponseTime = result.respTime
+ }
+ }
+ skew := clockMax.Sub(clockMin)
+ resp.ClockSkew = arvados.Duration(skew)
+ if skew > maxClockSkew+maxResponseTime {
+ msg := fmt.Sprintf("clock skew detected: maximum timestamp spread is %s (exceeds warning threshold of %s)", resp.ClockSkew, arvados.Duration(maxClockSkew))
+ resp.Errors = append(resp.Errors, msg)
+ resp.Health = "ERROR"
+ }
+ if agg.MetricClockSkew != nil {
+ agg.MetricClockSkew.Set(skew.Seconds())
+ }
+
+ // Check for mismatched config files
+ var newest Metrics
+ for _, result := range resp.Checks {
+ if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
+ newest = result.Metrics
+ }
+ }
+ var mismatches []string
+ for target, result := range resp.Checks {
+ if hash := result.Metrics.ConfigSourceSHA256; hash != "" && hash != newest.ConfigSourceSHA256 {
+ mismatches = append(mismatches, target)
+ }
+ }
+ for _, target := range mismatches {
+ msg := fmt.Sprintf("outdated config: %s: config file (sha256 %s) does not match latest version with timestamp %s",
+ strings.TrimSuffix(target, "/_health/ping"),
+ resp.Checks[target].Metrics.ConfigSourceSHA256,
+ newest.ConfigSourceTimestamp.Format(time.RFC3339))
+ resp.Errors = append(resp.Errors, msg)
+ resp.Health = "ERROR"
+ }
+
+ // Check for services running a different version than we are.
+ for target, result := range resp.Checks {
+ if result.Metrics.Version != "" && !sameVersion(result.Metrics.Version, cmd.Version.String()) {
+ msg := fmt.Sprintf("version mismatch: %s is running %s -- expected %s",
+ strings.TrimSuffix(target, "/_health/ping"),
+ result.Metrics.Version,
+ cmd.Version.String())
+ resp.Errors = append(resp.Errors, msg)
resp.Health = "ERROR"
- break
}
}
return resp
}
-func (agg *Aggregator) pingURL(node, addr string) (string, error) {
- _, port, err := net.SplitHostPort(addr)
- return "http://" + node + ":" + port + "/_health/ping", err
+func (agg *Aggregator) pingURL(svcURL arvados.URL) (*url.URL, error) {
+ base := url.URL(svcURL)
+ return base.Parse("/_health/ping")
}
-func (agg *Aggregator) ping(url string, cluster *arvados.Cluster) (result CheckResult) {
+func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
t0 := time.Now()
-
- var err error
defer func() {
- result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
- if err != nil {
- result.Health, result.Error = "ERROR", err.Error()
- } else {
- result.Health = "OK"
- }
+ result.respTime = time.Since(t0)
+ result.ResponseTime = json.Number(fmt.Sprintf("%.6f", result.respTime.Seconds()))
}()
+ result.Health = "ERROR"
- req, err := http.NewRequest("GET", url, nil)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
+ defer cancel()
+ req, err := http.NewRequestWithContext(ctx, "GET", target.String(), nil)
if err != nil {
+ result.Error = err.Error()
return
}
- req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
+ req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
+
+ // Avoid workbench1's redirect-http-to-https feature
+ req.Header.Set("X-Forwarded-Proto", "https")
- ctx, cancel := context.WithTimeout(req.Context(), time.Duration(agg.timeout))
- defer cancel()
- req = req.WithContext(ctx)
resp, err := agg.httpClient.Do(req)
+ if urlerr, ok := err.(*url.Error); ok {
+ if neterr, ok := urlerr.Err.(*net.OpError); ok && isLocalHost(target.Hostname()) {
+ result = CheckResult{
+ Health: "SKIP",
+ Error: neterr.Error(),
+ }
+ err = nil
+ return
+ }
+ }
if err != nil {
+ result.Error = err.Error()
return
}
result.HTTPStatusCode = resp.StatusCode
- result.HTTPStatusText = resp.Status
err = json.NewDecoder(resp.Body).Decode(&result.Response)
if err != nil {
- err = fmt.Errorf("cannot decode response: %s", err)
+ result.Error = fmt.Sprintf("cannot decode response: %s", err)
} else if resp.StatusCode != http.StatusOK {
- err = fmt.Errorf("HTTP %d %s", resp.StatusCode, resp.Status)
+ result.Error = fmt.Sprintf("HTTP %d %s", resp.StatusCode, resp.Status)
} else if h, _ := result.Response["health"].(string); h != "OK" {
if e, ok := result.Response["error"].(string); ok && e != "" {
- err = errors.New(e)
+ result.Error = e
+ return
} else {
- err = fmt.Errorf("health=%q in ping response", h)
+ result.Error = fmt.Sprintf("health=%q in ping response", h)
+ return
+ }
+ }
+ result.Health = "OK"
+ result.ClockTime, _ = time.Parse(time.RFC1123, resp.Header.Get("Date"))
+ return
+}
+
+var (
+ reConfigMetric = regexp.MustCompile(`arvados_config_source_timestamp_seconds{sha256="([0-9a-f]+)"} (\d[\d\.e\+]+)`)
+ reVersionMetric = regexp.MustCompile(`arvados_version_running{version="([^"]+)"} 1`)
+)
+
+func (agg *Aggregator) metrics(pingURL *url.URL) (result Metrics, err error) {
+ metricsURL, err := pingURL.Parse("/metrics")
+ if err != nil {
+ return
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
+ defer cancel()
+ req, err := http.NewRequestWithContext(ctx, "GET", metricsURL.String(), nil)
+ if err != nil {
+ return
+ }
+ req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
+
+ // Avoid workbench1's redirect-http-to-https feature
+ req.Header.Set("X-Forwarded-Proto", "https")
+
+ resp, err := agg.httpClient.Do(req)
+ if err != nil {
+ return
+ } else if resp.StatusCode != http.StatusOK {
+ err = fmt.Errorf("%s: HTTP %d %s", metricsURL.String(), resp.StatusCode, resp.Status)
+ return
+ }
+
+ scanner := bufio.NewScanner(resp.Body)
+ for scanner.Scan() {
+ if m := reConfigMetric.FindSubmatch(scanner.Bytes()); len(m) == 3 && len(m[1]) > 0 {
+ result.ConfigSourceSHA256 = string(m[1])
+ unixtime, _ := strconv.ParseFloat(string(m[2]), 64)
+ result.ConfigSourceTimestamp = time.UnixMicro(int64(unixtime * 1e6))
+ } else if m = reVersionMetric.FindSubmatch(scanner.Bytes()); len(m) == 2 && len(m[1]) > 0 {
+ result.Version = string(m[1])
}
}
+ if err = scanner.Err(); err != nil {
+ err = fmt.Errorf("error parsing response from %s: %w", metricsURL.String(), err)
+ return
+ }
return
}
-func (agg *Aggregator) checkAuth(req *http.Request, cluster *arvados.Cluster) bool {
- creds := auth.NewCredentialsFromHTTPRequest(req)
+// Test whether host is an easily recognizable loopback address:
+// 0.0.0.0, 127.x.x.x, ::1, or localhost.
+func isLocalHost(host string) bool {
+ ip := net.ParseIP(host)
+ return ip.IsLoopback() || bytes.Equal(ip.To4(), []byte{0, 0, 0, 0}) || strings.EqualFold(host, "localhost")
+}
+
+func (agg *Aggregator) checkAuth(req *http.Request) bool {
+ creds := auth.CredentialsFromRequest(req)
for _, token := range creds.Tokens {
- if token != "" && token == cluster.ManagementToken {
+ if token != "" && token == agg.Cluster.ManagementToken {
return true
}
}
return false
}
+
+var errSilent = errors.New("")
+
+var CheckCommand cmd.Handler = checkCommand{}
+
+type checkCommand struct{}
+
+func (ccmd checkCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ logger := ctxlog.New(stderr, "json", "info")
+ ctx := ctxlog.Context(context.Background(), logger)
+ err := ccmd.run(ctx, prog, args, stdin, stdout, stderr)
+ if err != nil {
+ if err != errSilent {
+ fmt.Fprintln(stdout, err.Error())
+ }
+ return 1
+ }
+ return 0
+}
+
+func (ccmd checkCommand) run(ctx context.Context, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) error {
+ flags := flag.NewFlagSet("", flag.ContinueOnError)
+ flags.SetOutput(stderr)
+ loader := config.NewLoader(stdin, ctxlog.New(stderr, "text", "info"))
+ loader.SetupFlags(flags)
+ versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
+ timeout := flags.Duration("timeout", defaultTimeout.Duration(), "Maximum time to wait for health responses")
+ outputYAML := flags.Bool("yaml", false, "Output full health report in YAML format (default mode shows errors as plain text, is silent on success)")
+ if ok, _ := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
+ // cmd.ParseFlags already reported the error
+ return errSilent
+ } else if *versionFlag {
+ cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
+ return nil
+ }
+ cfg, err := loader.Load()
+ if err != nil {
+ return err
+ }
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ return err
+ }
+ logger := ctxlog.New(stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel).WithFields(logrus.Fields{
+ "ClusterID": cluster.ClusterID,
+ })
+ ctx = ctxlog.Context(ctx, logger)
+ agg := Aggregator{Cluster: cluster, timeout: arvados.Duration(*timeout)}
+ resp := agg.ClusterHealth()
+ if *outputYAML {
+ y, err := yaml.Marshal(resp)
+ if err != nil {
+ return err
+ }
+ stdout.Write(y)
+ if resp.Health != "OK" {
+ return errSilent
+ }
+ return nil
+ }
+ if resp.Health != "OK" {
+ for _, msg := range resp.Errors {
+ fmt.Fprintln(stdout, msg)
+ }
+ fmt.Fprintln(stderr, "health check failed")
+ return errSilent
+ }
+ return nil
+}
+
+var reGoVersion = regexp.MustCompile(` \(go\d+([\d.])*\)$`)
+
+// Return true if either a==b or the only difference is that one has a
+// " (go1.2.3)" suffix and the other does not.
+//
+// This allows us to recognize a non-Go (rails) service as the same
+// version as a Go service.
+func sameVersion(a, b string) bool {
+ if a == b {
+ return true
+ }
+ anogo := reGoVersion.ReplaceAllLiteralString(a, "")
+ bnogo := reGoVersion.ReplaceAllLiteralString(b, "")
+ if (anogo == a) != (bnogo == b) {
+ // only one of a/b has a (go1.2.3) suffix, so compare
+ // without that part
+ return anogo == bnogo
+ }
+ // both or neither has a (go1.2.3) suffix, and we already know
+ // a!=b
+ return false
+}