1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
24 "git.arvados.org/arvados.git/lib/cmd"
25 "git.arvados.org/arvados.git/lib/config"
26 "git.arvados.org/arvados.git/sdk/go/arvados"
27 "git.arvados.org/arvados.git/sdk/go/auth"
28 "git.arvados.org/arvados.git/sdk/go/ctxlog"
29 "github.com/ghodss/yaml"
30 "github.com/sirupsen/logrus"
33 const defaultTimeout = arvados.Duration(2 * time.Second)
35 // Aggregator implements service.Handler. It handles "GET /_health/all"
36 // by checking the health of all configured services on the cluster
37 // and responding 200 if everything is healthy.
38 type Aggregator struct {
40 httpClient *http.Client
41 timeout arvados.Duration
43 Cluster *arvados.Cluster
45 // If non-nil, Log is called after handling each request.
46 Log func(*http.Request, error)
49 func (agg *Aggregator) setup() {
50 agg.httpClient = &http.Client{
51 Transport: &http.Transport{
52 TLSClientConfig: &tls.Config{
53 InsecureSkipVerify: agg.Cluster.TLS.Insecure,
58 // this is always the case, except in the test suite
59 agg.timeout = defaultTimeout
63 func (agg *Aggregator) CheckHealth() error {
67 func (agg *Aggregator) Done() <-chan struct{} {
71 func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
72 agg.setupOnce.Do(agg.setup)
73 sendErr := func(statusCode int, err error) {
74 resp.WriteHeader(statusCode)
75 json.NewEncoder(resp).Encode(map[string]string{"error": err.Error()})
81 resp.Header().Set("Content-Type", "application/json")
83 if !agg.checkAuth(req) {
84 sendErr(http.StatusUnauthorized, errUnauthorized)
87 if req.URL.Path == "/_health/all" {
88 json.NewEncoder(resp).Encode(agg.ClusterHealth())
89 } else if req.URL.Path == "/_health/ping" {
90 resp.Write(healthyBody)
92 sendErr(http.StatusNotFound, errNotFound)
100 type ClusterHealthResponse struct {
101 // "OK" if all needed services are OK, otherwise "ERROR".
102 Health string `json:"health"`
104 // An entry for each known health check of each known instance
105 // of each needed component: "instance of service S on node N
106 // reports health-check C is OK."
107 Checks map[string]CheckResult `json:"checks"`
109 // An entry for each service type: "service S is OK." This
110 // exposes problems that can't be expressed in Checks, like
111 // "service S is needed, but isn't configured to run
113 Services map[arvados.ServiceName]ServiceHealth `json:"services"`
115 Errors []string `json:"errors"`
118 type CheckResult struct {
119 Health string `json:"health"`
120 Error string `json:"error,omitempty"`
121 HTTPStatusCode int `json:",omitempty"`
122 HTTPStatusText string `json:",omitempty"`
123 Response map[string]interface{} `json:"response"`
124 ResponseTime json.Number `json:"responseTime"`
125 Metrics Metrics `json:"-"`
128 type Metrics struct {
129 ConfigSourceTimestamp time.Time
130 ConfigSourceSHA256 string
133 type ServiceHealth struct {
134 Health string `json:"health"`
138 func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
139 agg.setupOnce.Do(agg.setup)
140 resp := ClusterHealthResponse{
142 Checks: make(map[string]CheckResult),
143 Services: make(map[arvados.ServiceName]ServiceHealth),
147 wg := sync.WaitGroup{}
148 for svcName, svc := range agg.Cluster.Services.Map() {
149 // Ensure svc is listed in resp.Services.
151 if _, ok := resp.Services[svcName]; !ok {
152 resp.Services[svcName] = ServiceHealth{Health: "ERROR"}
156 checkURLs := map[arvados.URL]bool{}
157 for addr := range svc.InternalURLs {
158 checkURLs[addr] = true
160 if len(checkURLs) == 0 && svc.ExternalURL.Host != "" {
161 checkURLs[svc.ExternalURL] = true
163 for addr := range checkURLs {
165 go func(svcName arvados.ServiceName, addr arvados.URL) {
167 var result CheckResult
168 pingURL, err := agg.pingURL(addr)
170 result = CheckResult{
175 result = agg.ping(pingURL)
176 m, err := agg.metrics(pingURL)
178 result.Error = "metrics: " + err.Error()
185 resp.Checks[fmt.Sprintf("%s+%s", svcName, pingURL)] = result
186 if result.Health == "OK" {
187 h := resp.Services[svcName]
190 resp.Services[svcName] = h
192 resp.Health = "ERROR"
199 // Report ERROR if a needed service didn't fail any checks
200 // merely because it isn't configured to run anywhere.
201 for _, sh := range resp.Services {
202 if sh.Health != "OK" {
203 resp.Health = "ERROR"
209 for _, result := range resp.Checks {
210 if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
211 newest = result.Metrics
214 var mismatches []string
215 for target, result := range resp.Checks {
216 if hash := result.Metrics.ConfigSourceSHA256; hash != "" && hash != newest.ConfigSourceSHA256 {
217 mismatches = append(mismatches, target)
220 for _, target := range mismatches {
221 msg := fmt.Sprintf("outdated config: %s: config file (sha256 %s) does not match latest version with timestamp %s",
222 strings.TrimSuffix(target, "/_health/ping"),
223 resp.Checks[target].Metrics.ConfigSourceSHA256,
224 newest.ConfigSourceTimestamp.Format(time.RFC3339))
225 resp.Errors = append(resp.Errors, msg)
226 resp.Health = "ERROR"
231 func (agg *Aggregator) pingURL(svcURL arvados.URL) (*url.URL, error) {
232 base := url.URL(svcURL)
233 return base.Parse("/_health/ping")
236 func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
239 result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
241 result.Health = "ERROR"
243 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
245 req, err := http.NewRequestWithContext(ctx, "GET", target.String(), nil)
247 result.Error = err.Error()
250 req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
252 // Avoid workbench1's redirect-http-to-https feature
253 req.Header.Set("X-Forwarded-Proto", "https")
255 resp, err := agg.httpClient.Do(req)
257 result.Error = err.Error()
260 result.HTTPStatusCode = resp.StatusCode
261 result.HTTPStatusText = resp.Status
262 err = json.NewDecoder(resp.Body).Decode(&result.Response)
264 result.Error = fmt.Sprintf("cannot decode response: %s", err)
265 } else if resp.StatusCode != http.StatusOK {
266 result.Error = fmt.Sprintf("HTTP %d %s", resp.StatusCode, resp.Status)
267 } else if h, _ := result.Response["health"].(string); h != "OK" {
268 if e, ok := result.Response["error"].(string); ok && e != "" {
272 result.Error = fmt.Sprintf("health=%q in ping response", h)
280 var reMetric = regexp.MustCompile(`([a-z_]+){sha256="([0-9a-f]+)"} (\d[\d\.e\+]+)`)
282 func (agg *Aggregator) metrics(pingURL *url.URL) (result Metrics, err error) {
283 metricsURL, err := pingURL.Parse("/metrics")
287 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
289 req, err := http.NewRequestWithContext(ctx, "GET", metricsURL.String(), nil)
293 req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
295 // Avoid workbench1's redirect-http-to-https feature
296 req.Header.Set("X-Forwarded-Proto", "https")
298 resp, err := agg.httpClient.Do(req)
301 } else if resp.StatusCode != http.StatusOK {
302 err = fmt.Errorf("%s: HTTP %d %s", metricsURL.String(), resp.StatusCode, resp.Status)
306 scanner := bufio.NewScanner(resp.Body)
308 m := reMetric.FindSubmatch(scanner.Bytes())
309 if len(m) != 4 || string(m[1]) != "arvados_config_source_timestamp_seconds" {
312 result.ConfigSourceSHA256 = string(m[2])
313 unixtime, _ := strconv.ParseFloat(string(m[3]), 64)
314 result.ConfigSourceTimestamp = time.UnixMicro(int64(unixtime * 1e6))
316 if err = scanner.Err(); err != nil {
317 err = fmt.Errorf("error parsing response from %s: %w", metricsURL.String(), err)
323 func (agg *Aggregator) checkAuth(req *http.Request) bool {
324 creds := auth.CredentialsFromRequest(req)
325 for _, token := range creds.Tokens {
326 if token != "" && token == agg.Cluster.ManagementToken {
333 var errSilent = errors.New("")
335 var CheckCommand cmd.Handler = checkCommand{}
337 type checkCommand struct{}
339 func (ccmd checkCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
340 logger := ctxlog.New(stderr, "json", "info")
341 ctx := ctxlog.Context(context.Background(), logger)
342 err := ccmd.run(ctx, prog, args, stdin, stdout, stderr)
344 if err != errSilent {
345 fmt.Fprintln(stdout, err.Error())
352 func (ccmd checkCommand) run(ctx context.Context, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) error {
353 flags := flag.NewFlagSet("", flag.ContinueOnError)
354 flags.SetOutput(stderr)
355 loader := config.NewLoader(stdin, ctxlog.New(stderr, "text", "info"))
356 loader.SetupFlags(flags)
357 versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
358 timeout := flags.Duration("timeout", defaultTimeout.Duration(), "Maximum time to wait for health responses")
359 if ok, _ := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
360 // cmd.ParseFlags already reported the error
362 } else if *versionFlag {
363 cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
366 cfg, err := loader.Load()
370 cluster, err := cfg.GetCluster("")
374 logger := ctxlog.New(stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel).WithFields(logrus.Fields{
375 "ClusterID": cluster.ClusterID,
377 ctx = ctxlog.Context(ctx, logger)
378 agg := Aggregator{Cluster: cluster, timeout: arvados.Duration(*timeout)}
379 resp := agg.ClusterHealth()
380 buf, err := yaml.Marshal(resp)
385 if resp.Health != "OK" {
386 return fmt.Errorf("health check failed")