18794: Fail health check on mismatched configs.
[arvados.git] / sdk / go / health / aggregator.go
index ba532dddca7ee108e0af371086a02a71c078f0db..249df5e23b5e74904d061a3bb63fbad2f2e2b2f6 100644 (file)
@@ -5,6 +5,7 @@
 package health
 
 import (
+       "bufio"
        "context"
        "crypto/tls"
        "encoding/json"
@@ -14,6 +15,9 @@ import (
        "io"
        "net/http"
        "net/url"
+       "regexp"
+       "strconv"
+       "strings"
        "sync"
        "time"
 
@@ -107,6 +111,8 @@ type ClusterHealthResponse struct {
        // "service S is needed, but isn't configured to run
        // anywhere."
        Services map[arvados.ServiceName]ServiceHealth `json:"services"`
+
+       Errors []string `json:"errors"`
 }
 
 type CheckResult struct {
@@ -116,6 +122,12 @@ type CheckResult struct {
        HTTPStatusText string                 `json:",omitempty"`
        Response       map[string]interface{} `json:"response"`
        ResponseTime   json.Number            `json:"responseTime"`
+       Metrics        Metrics                `json:"-"`
+}
+
+type Metrics struct {
+       ConfigSourceTimestamp time.Time
+       ConfigSourceSHA256    string
 }
 
 type ServiceHealth struct {
@@ -161,6 +173,11 @@ func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
                                        }
                                } else {
                                        result = agg.ping(pingURL)
+                                       m, err := agg.metrics(pingURL)
+                                       if err != nil {
+                                               result.Error = "metrics: " + err.Error()
+                                       }
+                                       result.Metrics = m
                                }
 
                                mtx.Lock()
@@ -187,6 +204,27 @@ func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
                        break
                }
        }
+
+       var newest Metrics
+       for _, result := range resp.Checks {
+               if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
+                       newest = result.Metrics
+               }
+       }
+       var mismatches []string
+       for target, result := range resp.Checks {
+               if hash := result.Metrics.ConfigSourceSHA256; hash != "" && hash != newest.ConfigSourceSHA256 {
+                       mismatches = append(mismatches, target)
+               }
+       }
+       for _, target := range mismatches {
+               msg := fmt.Sprintf("outdated config: %s: config file (sha256 %s) does not match latest version with timestamp %s",
+                       strings.TrimSuffix(target, "/_health/ping"),
+                       resp.Checks[target].Metrics.ConfigSourceSHA256,
+                       newest.ConfigSourceTimestamp.Format(time.RFC3339))
+               resp.Errors = append(resp.Errors, msg)
+               resp.Health = "ERROR"
+       }
        return resp
 }
 
@@ -202,7 +240,9 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
        }()
        result.Health = "ERROR"
 
-       req, err := http.NewRequest("GET", target.String(), nil)
+       ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
+       defer cancel()
+       req, err := http.NewRequestWithContext(ctx, "GET", target.String(), nil)
        if err != nil {
                result.Error = err.Error()
                return
@@ -212,9 +252,6 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
        // Avoid workbench1's redirect-http-to-https feature
        req.Header.Set("X-Forwarded-Proto", "https")
 
-       ctx, cancel := context.WithTimeout(req.Context(), time.Duration(agg.timeout))
-       defer cancel()
-       req = req.WithContext(ctx)
        resp, err := agg.httpClient.Do(req)
        if err != nil {
                result.Error = err.Error()
@@ -240,6 +277,49 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
        return
 }
 
+var reMetric = regexp.MustCompile(`([a-z_]+){sha256="([0-9a-f]+)"} (\d[\d\.e\+]+)`)
+
+func (agg *Aggregator) metrics(pingURL *url.URL) (result Metrics, err error) {
+       metricsURL, err := pingURL.Parse("/metrics")
+       if err != nil {
+               return
+       }
+       ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
+       defer cancel()
+       req, err := http.NewRequestWithContext(ctx, "GET", metricsURL.String(), nil)
+       if err != nil {
+               return
+       }
+       req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
+
+       // Avoid workbench1's redirect-http-to-https feature
+       req.Header.Set("X-Forwarded-Proto", "https")
+
+       resp, err := agg.httpClient.Do(req)
+       if err != nil {
+               return
+       } else if resp.StatusCode != http.StatusOK {
+               err = fmt.Errorf("%s: HTTP %d %s", metricsURL.String(), resp.StatusCode, resp.Status)
+               return
+       }
+
+       scanner := bufio.NewScanner(resp.Body)
+       for scanner.Scan() {
+               m := reMetric.FindSubmatch(scanner.Bytes())
+               if len(m) != 4 || string(m[1]) != "arvados_config_source_timestamp_seconds" {
+                       continue
+               }
+               result.ConfigSourceSHA256 = string(m[2])
+               unixtime, _ := strconv.ParseFloat(string(m[3]), 64)
+               result.ConfigSourceTimestamp = time.UnixMicro(int64(unixtime * 1e6))
+       }
+       if err = scanner.Err(); err != nil {
+               err = fmt.Errorf("error parsing response from %s: %w", metricsURL.String(), err)
+               return
+       }
+       return
+}
+
 func (agg *Aggregator) checkAuth(req *http.Request) bool {
        creds := auth.CredentialsFromRequest(req)
        for _, token := range creds.Tokens {