1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
26 "git.arvados.org/arvados.git/lib/cmd"
27 "git.arvados.org/arvados.git/lib/config"
28 "git.arvados.org/arvados.git/sdk/go/arvados"
29 "git.arvados.org/arvados.git/sdk/go/auth"
30 "git.arvados.org/arvados.git/sdk/go/ctxlog"
31 "github.com/ghodss/yaml"
32 "github.com/prometheus/client_golang/prometheus"
33 "github.com/sirupsen/logrus"
37 defaultTimeout = arvados.Duration(2 * time.Second)
38 maxClockSkew = time.Minute
41 // Aggregator implements service.Handler. It handles "GET /_health/all"
42 // by checking the health of all configured services on the cluster
43 // and responding 200 if everything is healthy.
44 type Aggregator struct {
46 httpClient *http.Client
47 timeout arvados.Duration
49 Cluster *arvados.Cluster
51 // If non-nil, Log is called after handling each request.
52 Log func(*http.Request, error)
54 // If non-nil, report clock skew on each health-check.
55 MetricClockSkew prometheus.Gauge
58 func (agg *Aggregator) setup() {
59 agg.httpClient = &http.Client{
60 Transport: &http.Transport{
61 TLSClientConfig: &tls.Config{
62 InsecureSkipVerify: agg.Cluster.TLS.Insecure,
67 // this is always the case, except in the test suite
68 agg.timeout = defaultTimeout
72 func (agg *Aggregator) CheckHealth() error {
76 func (agg *Aggregator) Done() <-chan struct{} {
80 func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
81 agg.setupOnce.Do(agg.setup)
82 sendErr := func(statusCode int, err error) {
83 resp.WriteHeader(statusCode)
84 json.NewEncoder(resp).Encode(map[string]string{"error": err.Error()})
90 resp.Header().Set("Content-Type", "application/json")
92 if !agg.checkAuth(req) {
93 sendErr(http.StatusUnauthorized, errUnauthorized)
96 if req.URL.Path == "/_health/all" {
97 json.NewEncoder(resp).Encode(agg.ClusterHealth())
98 } else if req.URL.Path == "/_health/ping" {
99 resp.Write(healthyBody)
101 sendErr(http.StatusNotFound, errNotFound)
109 type ClusterHealthResponse struct {
110 // "OK" if all needed services are OK, otherwise "ERROR".
113 // An entry for each known health check of each known instance
114 // of each needed component: "instance of service S on node N
115 // reports health-check C is OK."
116 Checks map[string]CheckResult
118 // An entry for each service type: "service S is OK." This
119 // exposes problems that can't be expressed in Checks, like
120 // "service S is needed, but isn't configured to run
122 Services map[arvados.ServiceName]ServiceHealth
124 // Difference between min/max timestamps in individual
125 // health-check responses.
126 ClockSkew arvados.Duration
131 type CheckResult struct {
133 Error string `json:",omitempty"`
134 HTTPStatusCode int `json:",omitempty"`
135 Response map[string]interface{} `json:",omitempty"`
136 ResponseTime json.Number
138 Server string // "Server" header in http response
140 respTime time.Duration
143 type Metrics struct {
144 ConfigSourceTimestamp time.Time
145 ConfigSourceSHA256 string
149 type ServiceHealth struct {
150 Health string // "OK", "ERROR", or "SKIP"
154 func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
155 agg.setupOnce.Do(agg.setup)
156 resp := ClusterHealthResponse{
158 Checks: make(map[string]CheckResult),
159 Services: make(map[arvados.ServiceName]ServiceHealth),
163 wg := sync.WaitGroup{}
164 for svcName, svc := range agg.Cluster.Services.Map() {
165 // Ensure svc is listed in resp.Services.
167 if _, ok := resp.Services[svcName]; !ok {
168 resp.Services[svcName] = ServiceHealth{Health: "MISSING"}
172 checkURLs := map[arvados.URL]bool{}
173 for addr := range svc.InternalURLs {
174 checkURLs[addr] = true
176 if len(checkURLs) == 0 && svc.ExternalURL.Host != "" {
177 checkURLs[svc.ExternalURL] = true
179 for addr := range checkURLs {
181 go func(svcName arvados.ServiceName, addr arvados.URL) {
183 var result CheckResult
184 pingURL, err := agg.pingURL(addr)
186 result = CheckResult{
191 result = agg.ping(pingURL)
192 if result.Health != "SKIP" {
193 m, err := agg.metrics(pingURL)
194 if err != nil && result.Error == "" {
195 result.Error = "metrics: " + err.Error()
203 resp.Checks[fmt.Sprintf("%s+%s", svcName, pingURL)] = result
204 if result.Health == "OK" || result.Health == "SKIP" {
205 h := resp.Services[svcName]
207 if result.Health == "OK" || h.N == 1 {
208 // "" => "SKIP" or "OK"
210 h.Health = result.Health
212 resp.Services[svcName] = h
214 resp.Health = "ERROR"
215 resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: %s", svcName, result.Health, result.Error))
222 // Report ERROR if a needed service didn't fail any checks
223 // merely because it isn't configured to run anywhere.
224 for svcName, sh := range resp.Services {
226 case arvados.ServiceNameDispatchCloud,
227 arvados.ServiceNameDispatchLSF,
228 arvados.ServiceNameDispatchSLURM:
229 // ok to not run any given dispatcher
230 case arvados.ServiceNameHealth,
231 arvados.ServiceNameWorkbench1,
232 arvados.ServiceNameWorkbench2:
233 // typically doesn't have InternalURLs in config
235 if sh.Health != "OK" && sh.Health != "SKIP" {
236 resp.Health = "ERROR"
237 resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: no InternalURLs configured", svcName, sh.Health))
243 // Check for clock skew between hosts
244 var maxResponseTime time.Duration
245 var clockMin, clockMax time.Time
246 for _, result := range resp.Checks {
247 if result.ClockTime.IsZero() {
250 if clockMin.IsZero() || result.ClockTime.Before(clockMin) {
251 clockMin = result.ClockTime
253 if result.ClockTime.After(clockMax) {
254 clockMax = result.ClockTime
256 if result.respTime > maxResponseTime {
257 maxResponseTime = result.respTime
260 skew := clockMax.Sub(clockMin)
261 resp.ClockSkew = arvados.Duration(skew)
262 if skew > maxClockSkew+maxResponseTime {
263 msg := fmt.Sprintf("clock skew detected: maximum timestamp spread is %s (exceeds warning threshold of %s)", resp.ClockSkew, arvados.Duration(maxClockSkew))
264 resp.Errors = append(resp.Errors, msg)
265 resp.Health = "ERROR"
267 if agg.MetricClockSkew != nil {
268 agg.MetricClockSkew.Set(skew.Seconds())
271 // Check for mismatched config files
273 for _, result := range resp.Checks {
274 if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
275 newest = result.Metrics
278 var mismatches []string
279 for target, result := range resp.Checks {
280 if hash := result.Metrics.ConfigSourceSHA256; hash != "" && hash != newest.ConfigSourceSHA256 {
281 mismatches = append(mismatches, target)
284 for _, target := range mismatches {
285 msg := fmt.Sprintf("outdated config: %s: config file (sha256 %s) does not match latest version with timestamp %s",
286 strings.TrimSuffix(target, "/_health/ping"),
287 resp.Checks[target].Metrics.ConfigSourceSHA256,
288 newest.ConfigSourceTimestamp.Format(time.RFC3339))
289 resp.Errors = append(resp.Errors, msg)
290 resp.Health = "ERROR"
293 // Check for services running a different version than we are.
294 for target, result := range resp.Checks {
295 if result.Metrics.Version != "" && !sameVersion(result.Metrics.Version, cmd.Version.String()) {
296 msg := fmt.Sprintf("version mismatch: %s is running %s -- expected %s",
297 strings.TrimSuffix(target, "/_health/ping"),
298 result.Metrics.Version,
299 cmd.Version.String())
300 resp.Errors = append(resp.Errors, msg)
301 resp.Health = "ERROR"
307 func (agg *Aggregator) pingURL(svcURL arvados.URL) (*url.URL, error) {
308 base := url.URL(svcURL)
309 return base.Parse("/_health/ping")
312 func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
315 result.respTime = time.Since(t0)
316 result.ResponseTime = json.Number(fmt.Sprintf("%.6f", result.respTime.Seconds()))
318 result.Health = "ERROR"
320 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
322 req, err := http.NewRequestWithContext(ctx, "GET", target.String(), nil)
324 result.Error = err.Error()
327 req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
329 // Avoid workbench1's redirect-http-to-https feature
330 req.Header.Set("X-Forwarded-Proto", "https")
332 resp, err := agg.httpClient.Do(req)
333 if urlerr, ok := err.(*url.Error); ok {
334 if neterr, ok := urlerr.Err.(*net.OpError); ok && isLocalHost(target.Hostname()) {
335 result = CheckResult{
337 Error: neterr.Error(),
344 result.Error = err.Error()
347 result.HTTPStatusCode = resp.StatusCode
348 err = json.NewDecoder(resp.Body).Decode(&result.Response)
350 result.Error = fmt.Sprintf("cannot decode response: %s", err)
351 } else if resp.StatusCode != http.StatusOK {
352 result.Error = fmt.Sprintf("HTTP %d %s", resp.StatusCode, resp.Status)
353 } else if h, _ := result.Response["health"].(string); h != "OK" {
354 if e, ok := result.Response["error"].(string); ok && e != "" {
358 result.Error = fmt.Sprintf("health=%q in ping response", h)
363 result.ClockTime, _ = time.Parse(time.RFC1123, resp.Header.Get("Date"))
364 result.Server = resp.Header.Get("Server")
369 reConfigMetric = regexp.MustCompile(`arvados_config_source_timestamp_seconds{sha256="([0-9a-f]+)"} (\d[\d\.e\+]+)`)
370 reVersionMetric = regexp.MustCompile(`arvados_version_running{version="([^"]+)"} 1`)
373 func (agg *Aggregator) metrics(pingURL *url.URL) (result Metrics, err error) {
374 metricsURL, err := pingURL.Parse("/metrics")
378 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
380 req, err := http.NewRequestWithContext(ctx, "GET", metricsURL.String(), nil)
384 req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
386 // Avoid workbench1's redirect-http-to-https feature
387 req.Header.Set("X-Forwarded-Proto", "https")
389 resp, err := agg.httpClient.Do(req)
392 } else if resp.StatusCode != http.StatusOK {
393 err = fmt.Errorf("%s: HTTP %d %s", metricsURL.String(), resp.StatusCode, resp.Status)
397 scanner := bufio.NewScanner(resp.Body)
399 if m := reConfigMetric.FindSubmatch(scanner.Bytes()); len(m) == 3 && len(m[1]) > 0 {
400 result.ConfigSourceSHA256 = string(m[1])
401 unixtime, _ := strconv.ParseFloat(string(m[2]), 64)
402 result.ConfigSourceTimestamp = time.UnixMicro(int64(unixtime * 1e6))
403 } else if m = reVersionMetric.FindSubmatch(scanner.Bytes()); len(m) == 2 && len(m[1]) > 0 {
404 result.Version = string(m[1])
407 if err = scanner.Err(); err != nil {
408 err = fmt.Errorf("error parsing response from %s: %w", metricsURL.String(), err)
414 // Test whether host is an easily recognizable loopback address:
415 // 0.0.0.0, 127.x.x.x, ::1, or localhost.
416 func isLocalHost(host string) bool {
417 ip := net.ParseIP(host)
418 return ip.IsLoopback() || bytes.Equal(ip.To4(), []byte{0, 0, 0, 0}) || strings.EqualFold(host, "localhost")
421 func (agg *Aggregator) checkAuth(req *http.Request) bool {
422 creds := auth.CredentialsFromRequest(req)
423 for _, token := range creds.Tokens {
424 if token != "" && token == agg.Cluster.ManagementToken {
431 var errSilent = errors.New("")
433 var CheckCommand cmd.Handler = checkCommand{}
435 type checkCommand struct{}
437 func (ccmd checkCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
438 logger := ctxlog.New(stderr, "json", "info")
439 ctx := ctxlog.Context(context.Background(), logger)
440 err := ccmd.run(ctx, prog, args, stdin, stdout, stderr)
442 if err != errSilent {
443 fmt.Fprintln(stderr, err.Error())
450 func (ccmd checkCommand) run(ctx context.Context, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) error {
451 flags := flag.NewFlagSet("", flag.ContinueOnError)
452 flags.SetOutput(stderr)
453 loader := config.NewLoader(stdin, ctxlog.New(stderr, "text", "info"))
454 loader.SetupFlags(flags)
455 versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
456 timeout := flags.Duration("timeout", defaultTimeout.Duration(), "Maximum time to wait for health responses")
457 quiet := flags.Bool("quiet", false, "Silent on success (suppress 'health check OK' message on stderr)")
458 outputYAML := flags.Bool("yaml", false, "Output full health report in YAML format (default mode prints 'health check OK' or plain text errors)")
459 if ok, _ := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
460 // cmd.ParseFlags already reported the error
462 } else if *versionFlag {
463 cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
466 cfg, err := loader.Load()
470 cluster, err := cfg.GetCluster("")
474 logger := ctxlog.New(stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel).WithFields(logrus.Fields{
475 "ClusterID": cluster.ClusterID,
477 ctx = ctxlog.Context(ctx, logger)
478 agg := Aggregator{Cluster: cluster, timeout: arvados.Duration(*timeout)}
479 resp := agg.ClusterHealth()
481 y, err := yaml.Marshal(resp)
486 if resp.Health != "OK" {
491 if resp.Health != "OK" {
492 for _, msg := range resp.Errors {
493 fmt.Fprintln(stderr, msg)
495 fmt.Fprintln(stderr, "health check failed")
499 fmt.Fprintln(stderr, "health check OK")
505 reGoVersion = regexp.MustCompile(` \(go\d+([\d.])*\)$`)
506 reDevVersion = regexp.MustCompile(`~dev\d+$`)
509 // Return true if either a==b or the only difference is that one has a
510 // " (go1.2.3)" suffix and the other does not.
512 // This allows us to recognize a non-Go (rails) service as the same
513 // version as a Go service.
514 func sameVersion(a, b string) bool {
515 // Strip " (go1.2.3)" suffix
516 a = reGoVersion.ReplaceAllLiteralString(a, "")
517 b = reGoVersion.ReplaceAllLiteralString(b, "")
518 anodev := reDevVersion.ReplaceAllLiteralString(a, "")
519 bnodev := reDevVersion.ReplaceAllLiteralString(b, "")
520 return anodev == bnodev && (a == anodev) == (b == bnodev)