1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
26 "git.arvados.org/arvados.git/lib/cmd"
27 "git.arvados.org/arvados.git/lib/config"
28 "git.arvados.org/arvados.git/sdk/go/arvados"
29 "git.arvados.org/arvados.git/sdk/go/auth"
30 "git.arvados.org/arvados.git/sdk/go/ctxlog"
31 "github.com/ghodss/yaml"
32 "github.com/prometheus/client_golang/prometheus"
33 "github.com/sirupsen/logrus"
37 defaultTimeout = arvados.Duration(2 * time.Second)
38 maxClockSkew = time.Minute
41 // Aggregator implements service.Handler. It handles "GET /_health/all"
42 // by checking the health of all configured services on the cluster
43 // and responding 200 if everything is healthy.
44 type Aggregator struct {
46 httpClient *http.Client
47 timeout arvados.Duration
49 Cluster *arvados.Cluster
51 // If non-nil, Log is called after handling each request.
52 Log func(*http.Request, error)
54 // If non-nil, report clock skew on each health-check.
55 MetricClockSkew prometheus.Gauge
58 func (agg *Aggregator) setup() {
59 agg.httpClient = &http.Client{
60 Transport: &http.Transport{
61 TLSClientConfig: &tls.Config{
62 InsecureSkipVerify: agg.Cluster.TLS.Insecure,
67 // this is always the case, except in the test suite
68 agg.timeout = defaultTimeout
72 func (agg *Aggregator) CheckHealth() error {
76 func (agg *Aggregator) Done() <-chan struct{} {
80 func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
81 agg.setupOnce.Do(agg.setup)
82 sendErr := func(statusCode int, err error) {
83 resp.WriteHeader(statusCode)
84 json.NewEncoder(resp).Encode(map[string]string{"error": err.Error()})
90 resp.Header().Set("Content-Type", "application/json")
92 if !agg.checkAuth(req) {
93 sendErr(http.StatusUnauthorized, errUnauthorized)
96 if req.URL.Path == "/_health/all" {
97 json.NewEncoder(resp).Encode(agg.ClusterHealth())
98 } else if req.URL.Path == "/_health/ping" {
99 resp.Write(healthyBody)
101 sendErr(http.StatusNotFound, errNotFound)
109 type ClusterHealthResponse struct {
110 // "OK" if all needed services are OK, otherwise "ERROR".
111 Health string `json:"health"`
113 // An entry for each known health check of each known instance
114 // of each needed component: "instance of service S on node N
115 // reports health-check C is OK."
116 Checks map[string]CheckResult `json:"checks"`
118 // An entry for each service type: "service S is OK." This
119 // exposes problems that can't be expressed in Checks, like
120 // "service S is needed, but isn't configured to run
122 Services map[arvados.ServiceName]ServiceHealth `json:"services"`
124 // Difference between min/max timestamps in individual
125 // health-check responses.
126 ClockSkew arvados.Duration
128 Errors []string `json:"errors"`
131 type CheckResult struct {
132 Health string `json:"health"`
133 Error string `json:"error,omitempty"`
134 HTTPStatusCode int `json:",omitempty"`
135 HTTPStatusText string `json:",omitempty"`
136 Response map[string]interface{} `json:"response"`
137 ResponseTime json.Number `json:"responseTime"`
138 ClockTime time.Time `json:"clockTime"`
139 Metrics Metrics `json:"-"`
140 respTime time.Duration
143 type Metrics struct {
144 ConfigSourceTimestamp time.Time
145 ConfigSourceSHA256 string
148 type ServiceHealth struct {
149 Health string `json:"health"` // "OK", "ERROR", or "SKIP"
153 func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
154 agg.setupOnce.Do(agg.setup)
155 resp := ClusterHealthResponse{
157 Checks: make(map[string]CheckResult),
158 Services: make(map[arvados.ServiceName]ServiceHealth),
162 wg := sync.WaitGroup{}
163 for svcName, svc := range agg.Cluster.Services.Map() {
164 // Ensure svc is listed in resp.Services.
166 if _, ok := resp.Services[svcName]; !ok {
167 resp.Services[svcName] = ServiceHealth{Health: "MISSING"}
171 checkURLs := map[arvados.URL]bool{}
172 for addr := range svc.InternalURLs {
173 checkURLs[addr] = true
175 if len(checkURLs) == 0 && svc.ExternalURL.Host != "" {
176 checkURLs[svc.ExternalURL] = true
178 for addr := range checkURLs {
180 go func(svcName arvados.ServiceName, addr arvados.URL) {
182 var result CheckResult
183 pingURL, err := agg.pingURL(addr)
185 result = CheckResult{
190 result = agg.ping(pingURL)
191 if result.Health != "SKIP" {
192 m, err := agg.metrics(pingURL)
193 if err != nil && result.Error == "" {
194 result.Error = "metrics: " + err.Error()
202 resp.Checks[fmt.Sprintf("%s+%s", svcName, pingURL)] = result
203 if result.Health == "OK" || result.Health == "SKIP" {
204 h := resp.Services[svcName]
206 if result.Health == "OK" || h.N == 1 {
207 // "" => "SKIP" or "OK"
209 h.Health = result.Health
211 resp.Services[svcName] = h
213 resp.Health = "ERROR"
214 resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: %s", svcName, result.Health, result.Error))
221 // Report ERROR if a needed service didn't fail any checks
222 // merely because it isn't configured to run anywhere.
223 for svcName, sh := range resp.Services {
225 case arvados.ServiceNameDispatchCloud,
226 arvados.ServiceNameDispatchLSF:
227 // ok to not run any given dispatcher
228 case arvados.ServiceNameHealth,
229 arvados.ServiceNameWorkbench1,
230 arvados.ServiceNameWorkbench2:
231 // typically doesn't have InternalURLs in config
233 if sh.Health != "OK" && sh.Health != "SKIP" {
234 resp.Health = "ERROR"
235 resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: no InternalURLs configured", svcName, sh.Health))
241 var maxResponseTime time.Duration
242 var clockMin, clockMax time.Time
243 for _, result := range resp.Checks {
244 if result.ClockTime.IsZero() {
247 if clockMin.IsZero() || result.ClockTime.Before(clockMin) {
248 clockMin = result.ClockTime
250 if result.ClockTime.After(clockMax) {
251 clockMax = result.ClockTime
253 if result.respTime > maxResponseTime {
254 maxResponseTime = result.respTime
257 skew := clockMax.Sub(clockMin)
258 resp.ClockSkew = arvados.Duration(skew)
259 if skew > maxClockSkew+maxResponseTime {
260 msg := fmt.Sprintf("clock skew detected: maximum timestamp spread is %s (exceeds warning threshold of %s)", resp.ClockSkew, arvados.Duration(maxClockSkew))
261 resp.Errors = append(resp.Errors, msg)
262 resp.Health = "ERROR"
264 if agg.MetricClockSkew != nil {
265 agg.MetricClockSkew.Set(skew.Seconds())
269 for _, result := range resp.Checks {
270 if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
271 newest = result.Metrics
274 var mismatches []string
275 for target, result := range resp.Checks {
276 if hash := result.Metrics.ConfigSourceSHA256; hash != "" && hash != newest.ConfigSourceSHA256 {
277 mismatches = append(mismatches, target)
280 for _, target := range mismatches {
281 msg := fmt.Sprintf("outdated config: %s: config file (sha256 %s) does not match latest version with timestamp %s",
282 strings.TrimSuffix(target, "/_health/ping"),
283 resp.Checks[target].Metrics.ConfigSourceSHA256,
284 newest.ConfigSourceTimestamp.Format(time.RFC3339))
285 resp.Errors = append(resp.Errors, msg)
286 resp.Health = "ERROR"
291 func (agg *Aggregator) pingURL(svcURL arvados.URL) (*url.URL, error) {
292 base := url.URL(svcURL)
293 return base.Parse("/_health/ping")
296 func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
299 result.respTime = time.Since(t0)
300 result.ResponseTime = json.Number(fmt.Sprintf("%.6f", result.respTime.Seconds()))
302 result.Health = "ERROR"
304 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
306 req, err := http.NewRequestWithContext(ctx, "GET", target.String(), nil)
308 result.Error = err.Error()
311 req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
313 // Avoid workbench1's redirect-http-to-https feature
314 req.Header.Set("X-Forwarded-Proto", "https")
316 resp, err := agg.httpClient.Do(req)
317 if urlerr, ok := err.(*url.Error); ok {
318 if neterr, ok := urlerr.Err.(*net.OpError); ok && isLocalHost(target.Hostname()) {
319 result = CheckResult{
321 Error: neterr.Error(),
328 result.Error = err.Error()
331 result.HTTPStatusCode = resp.StatusCode
332 result.HTTPStatusText = resp.Status
333 err = json.NewDecoder(resp.Body).Decode(&result.Response)
335 result.Error = fmt.Sprintf("cannot decode response: %s", err)
336 } else if resp.StatusCode != http.StatusOK {
337 result.Error = fmt.Sprintf("HTTP %d %s", resp.StatusCode, resp.Status)
338 } else if h, _ := result.Response["health"].(string); h != "OK" {
339 if e, ok := result.Response["error"].(string); ok && e != "" {
343 result.Error = fmt.Sprintf("health=%q in ping response", h)
348 result.ClockTime, _ = time.Parse(time.RFC1123, resp.Header.Get("Date"))
352 var reMetric = regexp.MustCompile(`([a-z_]+){sha256="([0-9a-f]+)"} (\d[\d\.e\+]+)`)
354 func (agg *Aggregator) metrics(pingURL *url.URL) (result Metrics, err error) {
355 metricsURL, err := pingURL.Parse("/metrics")
359 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
361 req, err := http.NewRequestWithContext(ctx, "GET", metricsURL.String(), nil)
365 req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
367 // Avoid workbench1's redirect-http-to-https feature
368 req.Header.Set("X-Forwarded-Proto", "https")
370 resp, err := agg.httpClient.Do(req)
373 } else if resp.StatusCode != http.StatusOK {
374 err = fmt.Errorf("%s: HTTP %d %s", metricsURL.String(), resp.StatusCode, resp.Status)
378 scanner := bufio.NewScanner(resp.Body)
380 m := reMetric.FindSubmatch(scanner.Bytes())
381 if len(m) != 4 || string(m[1]) != "arvados_config_source_timestamp_seconds" {
384 result.ConfigSourceSHA256 = string(m[2])
385 unixtime, _ := strconv.ParseFloat(string(m[3]), 64)
386 result.ConfigSourceTimestamp = time.UnixMicro(int64(unixtime * 1e6))
388 if err = scanner.Err(); err != nil {
389 err = fmt.Errorf("error parsing response from %s: %w", metricsURL.String(), err)
395 // Test whether host is an easily recognizable loopback address:
396 // 0.0.0.0, 127.x.x.x, ::1, or localhost.
397 func isLocalHost(host string) bool {
398 ip := net.ParseIP(host)
399 return ip.IsLoopback() || bytes.Equal(ip.To4(), []byte{0, 0, 0, 0}) || strings.EqualFold(host, "localhost")
402 func (agg *Aggregator) checkAuth(req *http.Request) bool {
403 creds := auth.CredentialsFromRequest(req)
404 for _, token := range creds.Tokens {
405 if token != "" && token == agg.Cluster.ManagementToken {
412 var errSilent = errors.New("")
414 var CheckCommand cmd.Handler = checkCommand{}
416 type checkCommand struct{}
418 func (ccmd checkCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
419 logger := ctxlog.New(stderr, "json", "info")
420 ctx := ctxlog.Context(context.Background(), logger)
421 err := ccmd.run(ctx, prog, args, stdin, stdout, stderr)
423 if err != errSilent {
424 fmt.Fprintln(stdout, err.Error())
431 func (ccmd checkCommand) run(ctx context.Context, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) error {
432 flags := flag.NewFlagSet("", flag.ContinueOnError)
433 flags.SetOutput(stderr)
434 loader := config.NewLoader(stdin, ctxlog.New(stderr, "text", "info"))
435 loader.SetupFlags(flags)
436 versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
437 timeout := flags.Duration("timeout", defaultTimeout.Duration(), "Maximum time to wait for health responses")
438 outputYAML := flags.Bool("yaml", false, "Output full health report in YAML format (default mode shows errors as plain text, is silent on success)")
439 if ok, _ := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
440 // cmd.ParseFlags already reported the error
442 } else if *versionFlag {
443 cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
446 cfg, err := loader.Load()
450 cluster, err := cfg.GetCluster("")
454 logger := ctxlog.New(stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel).WithFields(logrus.Fields{
455 "ClusterID": cluster.ClusterID,
457 ctx = ctxlog.Context(ctx, logger)
458 agg := Aggregator{Cluster: cluster, timeout: arvados.Duration(*timeout)}
459 resp := agg.ClusterHealth()
461 y, err := yaml.Marshal(resp)
466 if resp.Health != "OK" {
471 if resp.Health != "OK" {
472 for _, msg := range resp.Errors {
473 fmt.Fprintln(stdout, msg)
475 fmt.Fprintln(stderr, "health check failed")