1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
26 "git.arvados.org/arvados.git/lib/cmd"
27 "git.arvados.org/arvados.git/lib/config"
28 "git.arvados.org/arvados.git/sdk/go/arvados"
29 "git.arvados.org/arvados.git/sdk/go/auth"
30 "git.arvados.org/arvados.git/sdk/go/ctxlog"
31 "github.com/ghodss/yaml"
32 "github.com/sirupsen/logrus"
35 const defaultTimeout = arvados.Duration(2 * time.Second)
37 // Aggregator implements service.Handler. It handles "GET /_health/all"
38 // by checking the health of all configured services on the cluster
39 // and responding 200 if everything is healthy.
40 type Aggregator struct {
42 httpClient *http.Client
43 timeout arvados.Duration
45 Cluster *arvados.Cluster
47 // If non-nil, Log is called after handling each request.
48 Log func(*http.Request, error)
51 func (agg *Aggregator) setup() {
52 agg.httpClient = &http.Client{
53 Transport: &http.Transport{
54 TLSClientConfig: &tls.Config{
55 InsecureSkipVerify: agg.Cluster.TLS.Insecure,
60 // this is always the case, except in the test suite
61 agg.timeout = defaultTimeout
65 func (agg *Aggregator) CheckHealth() error {
69 func (agg *Aggregator) Done() <-chan struct{} {
73 func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
74 agg.setupOnce.Do(agg.setup)
75 sendErr := func(statusCode int, err error) {
76 resp.WriteHeader(statusCode)
77 json.NewEncoder(resp).Encode(map[string]string{"error": err.Error()})
83 resp.Header().Set("Content-Type", "application/json")
85 if !agg.checkAuth(req) {
86 sendErr(http.StatusUnauthorized, errUnauthorized)
89 if req.URL.Path == "/_health/all" {
90 json.NewEncoder(resp).Encode(agg.ClusterHealth())
91 } else if req.URL.Path == "/_health/ping" {
92 resp.Write(healthyBody)
94 sendErr(http.StatusNotFound, errNotFound)
102 type ClusterHealthResponse struct {
103 // "OK" if all needed services are OK, otherwise "ERROR".
104 Health string `json:"health"`
106 // An entry for each known health check of each known instance
107 // of each needed component: "instance of service S on node N
108 // reports health-check C is OK."
109 Checks map[string]CheckResult `json:"checks"`
111 // An entry for each service type: "service S is OK." This
112 // exposes problems that can't be expressed in Checks, like
113 // "service S is needed, but isn't configured to run
115 Services map[arvados.ServiceName]ServiceHealth `json:"services"`
117 Errors []string `json:"errors"`
120 type CheckResult struct {
121 Health string `json:"health"`
122 Error string `json:"error,omitempty"`
123 HTTPStatusCode int `json:",omitempty"`
124 HTTPStatusText string `json:",omitempty"`
125 Response map[string]interface{} `json:"response"`
126 ResponseTime json.Number `json:"responseTime"`
127 Metrics Metrics `json:"-"`
130 type Metrics struct {
131 ConfigSourceTimestamp time.Time
132 ConfigSourceSHA256 string
135 type ServiceHealth struct {
136 Health string `json:"health"` // "OK", "ERROR", or "SKIP"
140 func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
141 agg.setupOnce.Do(agg.setup)
142 resp := ClusterHealthResponse{
144 Checks: make(map[string]CheckResult),
145 Services: make(map[arvados.ServiceName]ServiceHealth),
149 wg := sync.WaitGroup{}
150 for svcName, svc := range agg.Cluster.Services.Map() {
151 // Ensure svc is listed in resp.Services.
153 if _, ok := resp.Services[svcName]; !ok {
154 resp.Services[svcName] = ServiceHealth{Health: "MISSING"}
158 checkURLs := map[arvados.URL]bool{}
159 for addr := range svc.InternalURLs {
160 checkURLs[addr] = true
162 if len(checkURLs) == 0 && svc.ExternalURL.Host != "" {
163 checkURLs[svc.ExternalURL] = true
165 for addr := range checkURLs {
167 go func(svcName arvados.ServiceName, addr arvados.URL) {
169 var result CheckResult
170 pingURL, err := agg.pingURL(addr)
172 result = CheckResult{
177 result = agg.ping(pingURL)
178 if result.Health != "SKIP" {
179 m, err := agg.metrics(pingURL)
180 if err != nil && result.Error == "" {
181 result.Error = "metrics: " + err.Error()
189 resp.Checks[fmt.Sprintf("%s+%s", svcName, pingURL)] = result
190 if result.Health == "OK" || result.Health == "SKIP" {
191 h := resp.Services[svcName]
193 if result.Health == "OK" || h.N == 1 {
194 // "" => "SKIP" or "OK"
196 h.Health = result.Health
198 resp.Services[svcName] = h
200 resp.Health = "ERROR"
201 resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: %s", svcName, result.Health, result.Error))
208 // Report ERROR if a needed service didn't fail any checks
209 // merely because it isn't configured to run anywhere.
210 for svcName, sh := range resp.Services {
212 case arvados.ServiceNameDispatchCloud,
213 arvados.ServiceNameDispatchLSF:
214 // ok to not run any given dispatcher
215 case arvados.ServiceNameHealth,
216 arvados.ServiceNameWorkbench1,
217 arvados.ServiceNameWorkbench2:
218 // typically doesn't have InternalURLs in config
220 if sh.Health != "OK" && sh.Health != "SKIP" {
221 resp.Health = "ERROR"
222 resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: no InternalURLs configured", svcName, sh.Health))
229 for _, result := range resp.Checks {
230 if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
231 newest = result.Metrics
234 var mismatches []string
235 for target, result := range resp.Checks {
236 if hash := result.Metrics.ConfigSourceSHA256; hash != "" && hash != newest.ConfigSourceSHA256 {
237 mismatches = append(mismatches, target)
240 for _, target := range mismatches {
241 msg := fmt.Sprintf("outdated config: %s: config file (sha256 %s) does not match latest version with timestamp %s",
242 strings.TrimSuffix(target, "/_health/ping"),
243 resp.Checks[target].Metrics.ConfigSourceSHA256,
244 newest.ConfigSourceTimestamp.Format(time.RFC3339))
245 resp.Errors = append(resp.Errors, msg)
246 resp.Health = "ERROR"
251 func (agg *Aggregator) pingURL(svcURL arvados.URL) (*url.URL, error) {
252 base := url.URL(svcURL)
253 return base.Parse("/_health/ping")
256 func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
259 result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
261 result.Health = "ERROR"
263 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
265 req, err := http.NewRequestWithContext(ctx, "GET", target.String(), nil)
267 result.Error = err.Error()
270 req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
272 // Avoid workbench1's redirect-http-to-https feature
273 req.Header.Set("X-Forwarded-Proto", "https")
275 resp, err := agg.httpClient.Do(req)
276 if urlerr, ok := err.(*url.Error); ok {
277 if neterr, ok := urlerr.Err.(*net.OpError); ok && isLocalHost(target.Hostname()) {
278 result = CheckResult{
280 Error: neterr.Error(),
287 result.Error = err.Error()
290 result.HTTPStatusCode = resp.StatusCode
291 result.HTTPStatusText = resp.Status
292 err = json.NewDecoder(resp.Body).Decode(&result.Response)
294 result.Error = fmt.Sprintf("cannot decode response: %s", err)
295 } else if resp.StatusCode != http.StatusOK {
296 result.Error = fmt.Sprintf("HTTP %d %s", resp.StatusCode, resp.Status)
297 } else if h, _ := result.Response["health"].(string); h != "OK" {
298 if e, ok := result.Response["error"].(string); ok && e != "" {
302 result.Error = fmt.Sprintf("health=%q in ping response", h)
310 var reMetric = regexp.MustCompile(`([a-z_]+){sha256="([0-9a-f]+)"} (\d[\d\.e\+]+)`)
312 func (agg *Aggregator) metrics(pingURL *url.URL) (result Metrics, err error) {
313 metricsURL, err := pingURL.Parse("/metrics")
317 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
319 req, err := http.NewRequestWithContext(ctx, "GET", metricsURL.String(), nil)
323 req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
325 // Avoid workbench1's redirect-http-to-https feature
326 req.Header.Set("X-Forwarded-Proto", "https")
328 resp, err := agg.httpClient.Do(req)
331 } else if resp.StatusCode != http.StatusOK {
332 err = fmt.Errorf("%s: HTTP %d %s", metricsURL.String(), resp.StatusCode, resp.Status)
336 scanner := bufio.NewScanner(resp.Body)
338 m := reMetric.FindSubmatch(scanner.Bytes())
339 if len(m) != 4 || string(m[1]) != "arvados_config_source_timestamp_seconds" {
342 result.ConfigSourceSHA256 = string(m[2])
343 unixtime, _ := strconv.ParseFloat(string(m[3]), 64)
344 result.ConfigSourceTimestamp = time.UnixMicro(int64(unixtime * 1e6))
346 if err = scanner.Err(); err != nil {
347 err = fmt.Errorf("error parsing response from %s: %w", metricsURL.String(), err)
353 // Test whether host is an easily recognizable loopback address:
354 // 0.0.0.0, 127.x.x.x, ::1, or localhost.
355 func isLocalHost(host string) bool {
356 ip := net.ParseIP(host)
357 return ip.IsLoopback() || bytes.Equal(ip.To4(), []byte{0, 0, 0, 0}) || strings.EqualFold(host, "localhost")
360 func (agg *Aggregator) checkAuth(req *http.Request) bool {
361 creds := auth.CredentialsFromRequest(req)
362 for _, token := range creds.Tokens {
363 if token != "" && token == agg.Cluster.ManagementToken {
370 var errSilent = errors.New("")
372 var CheckCommand cmd.Handler = checkCommand{}
374 type checkCommand struct{}
376 func (ccmd checkCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
377 logger := ctxlog.New(stderr, "json", "info")
378 ctx := ctxlog.Context(context.Background(), logger)
379 err := ccmd.run(ctx, prog, args, stdin, stdout, stderr)
381 if err != errSilent {
382 fmt.Fprintln(stdout, err.Error())
389 func (ccmd checkCommand) run(ctx context.Context, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) error {
390 flags := flag.NewFlagSet("", flag.ContinueOnError)
391 flags.SetOutput(stderr)
392 loader := config.NewLoader(stdin, ctxlog.New(stderr, "text", "info"))
393 loader.SetupFlags(flags)
394 versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
395 timeout := flags.Duration("timeout", defaultTimeout.Duration(), "Maximum time to wait for health responses")
396 outputYAML := flags.Bool("yaml", false, "Output full health report in YAML format (default mode shows errors as plain text, is silent on success)")
397 if ok, _ := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
398 // cmd.ParseFlags already reported the error
400 } else if *versionFlag {
401 cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
404 cfg, err := loader.Load()
408 cluster, err := cfg.GetCluster("")
412 logger := ctxlog.New(stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel).WithFields(logrus.Fields{
413 "ClusterID": cluster.ClusterID,
415 ctx = ctxlog.Context(ctx, logger)
416 agg := Aggregator{Cluster: cluster, timeout: arvados.Duration(*timeout)}
417 resp := agg.ClusterHealth()
419 y, err := yaml.Marshal(resp)
424 if resp.Health != "OK" {
429 if resp.Health != "OK" {
430 for _, msg := range resp.Errors {
431 fmt.Fprintln(stdout, msg)
433 fmt.Fprintln(stderr, "health check failed")