18794: Fail health check on mismatched configs.
authorTom Clegg <tom@curii.com>
Mon, 11 Apr 2022 15:18:50 +0000 (11:18 -0400)
committerTom Clegg <tom@curii.com>
Tue, 26 Apr 2022 15:19:20 +0000 (11:19 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

sdk/go/health/aggregator.go
sdk/go/health/aggregator_test.go

index ba532dddca7ee108e0af371086a02a71c078f0db..249df5e23b5e74904d061a3bb63fbad2f2e2b2f6 100644 (file)
@@ -5,6 +5,7 @@
 package health
 
 import (
 package health
 
 import (
+       "bufio"
        "context"
        "crypto/tls"
        "encoding/json"
        "context"
        "crypto/tls"
        "encoding/json"
@@ -14,6 +15,9 @@ import (
        "io"
        "net/http"
        "net/url"
        "io"
        "net/http"
        "net/url"
+       "regexp"
+       "strconv"
+       "strings"
        "sync"
        "time"
 
        "sync"
        "time"
 
@@ -107,6 +111,8 @@ type ClusterHealthResponse struct {
        // "service S is needed, but isn't configured to run
        // anywhere."
        Services map[arvados.ServiceName]ServiceHealth `json:"services"`
        // "service S is needed, but isn't configured to run
        // anywhere."
        Services map[arvados.ServiceName]ServiceHealth `json:"services"`
+
+       Errors []string `json:"errors"`
 }
 
 type CheckResult struct {
 }
 
 type CheckResult struct {
@@ -116,6 +122,12 @@ type CheckResult struct {
        HTTPStatusText string                 `json:",omitempty"`
        Response       map[string]interface{} `json:"response"`
        ResponseTime   json.Number            `json:"responseTime"`
        HTTPStatusText string                 `json:",omitempty"`
        Response       map[string]interface{} `json:"response"`
        ResponseTime   json.Number            `json:"responseTime"`
+       Metrics        Metrics                `json:"-"`
+}
+
+type Metrics struct {
+       ConfigSourceTimestamp time.Time
+       ConfigSourceSHA256    string
 }
 
 type ServiceHealth struct {
 }
 
 type ServiceHealth struct {
@@ -161,6 +173,11 @@ func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
                                        }
                                } else {
                                        result = agg.ping(pingURL)
                                        }
                                } else {
                                        result = agg.ping(pingURL)
+                                       m, err := agg.metrics(pingURL)
+                                       if err != nil {
+                                               result.Error = "metrics: " + err.Error()
+                                       }
+                                       result.Metrics = m
                                }
 
                                mtx.Lock()
                                }
 
                                mtx.Lock()
@@ -187,6 +204,27 @@ func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
                        break
                }
        }
                        break
                }
        }
+
+       var newest Metrics
+       for _, result := range resp.Checks {
+               if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
+                       newest = result.Metrics
+               }
+       }
+       var mismatches []string
+       for target, result := range resp.Checks {
+               if hash := result.Metrics.ConfigSourceSHA256; hash != "" && hash != newest.ConfigSourceSHA256 {
+                       mismatches = append(mismatches, target)
+               }
+       }
+       for _, target := range mismatches {
+               msg := fmt.Sprintf("outdated config: %s: config file (sha256 %s) does not match latest version with timestamp %s",
+                       strings.TrimSuffix(target, "/_health/ping"),
+                       resp.Checks[target].Metrics.ConfigSourceSHA256,
+                       newest.ConfigSourceTimestamp.Format(time.RFC3339))
+               resp.Errors = append(resp.Errors, msg)
+               resp.Health = "ERROR"
+       }
        return resp
 }
 
        return resp
 }
 
@@ -202,7 +240,9 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
        }()
        result.Health = "ERROR"
 
        }()
        result.Health = "ERROR"
 
-       req, err := http.NewRequest("GET", target.String(), nil)
+       ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
+       defer cancel()
+       req, err := http.NewRequestWithContext(ctx, "GET", target.String(), nil)
        if err != nil {
                result.Error = err.Error()
                return
        if err != nil {
                result.Error = err.Error()
                return
@@ -212,9 +252,6 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
        // Avoid workbench1's redirect-http-to-https feature
        req.Header.Set("X-Forwarded-Proto", "https")
 
        // Avoid workbench1's redirect-http-to-https feature
        req.Header.Set("X-Forwarded-Proto", "https")
 
-       ctx, cancel := context.WithTimeout(req.Context(), time.Duration(agg.timeout))
-       defer cancel()
-       req = req.WithContext(ctx)
        resp, err := agg.httpClient.Do(req)
        if err != nil {
                result.Error = err.Error()
        resp, err := agg.httpClient.Do(req)
        if err != nil {
                result.Error = err.Error()
@@ -240,6 +277,49 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
        return
 }
 
        return
 }
 
+var reMetric = regexp.MustCompile(`([a-z_]+){sha256="([0-9a-f]+)"} (\d[\d\.e\+]+)`)
+
+func (agg *Aggregator) metrics(pingURL *url.URL) (result Metrics, err error) {
+       metricsURL, err := pingURL.Parse("/metrics")
+       if err != nil {
+               return
+       }
+       ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
+       defer cancel()
+       req, err := http.NewRequestWithContext(ctx, "GET", metricsURL.String(), nil)
+       if err != nil {
+               return
+       }
+       req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
+
+       // Avoid workbench1's redirect-http-to-https feature
+       req.Header.Set("X-Forwarded-Proto", "https")
+
+       resp, err := agg.httpClient.Do(req)
+       if err != nil {
+               return
+       } else if resp.StatusCode != http.StatusOK {
+               err = fmt.Errorf("%s: HTTP %d %s", metricsURL.String(), resp.StatusCode, resp.Status)
+               return
+       }
+
+       scanner := bufio.NewScanner(resp.Body)
+       for scanner.Scan() {
+               m := reMetric.FindSubmatch(scanner.Bytes())
+               if len(m) != 4 || string(m[1]) != "arvados_config_source_timestamp_seconds" {
+                       continue
+               }
+               result.ConfigSourceSHA256 = string(m[2])
+               unixtime, _ := strconv.ParseFloat(string(m[3]), 64)
+               result.ConfigSourceTimestamp = time.UnixMicro(int64(unixtime * 1e6))
+       }
+       if err = scanner.Err(); err != nil {
+               err = fmt.Errorf("error parsing response from %s: %w", metricsURL.String(), err)
+               return
+       }
+       return
+}
+
 func (agg *Aggregator) checkAuth(req *http.Request) bool {
        creds := auth.CredentialsFromRequest(req)
        for _, token := range creds.Tokens {
 func (agg *Aggregator) checkAuth(req *http.Request) bool {
        creds := auth.CredentialsFromRequest(req)
        for _, token := range creds.Tokens {
index 62eca894b7d1719f2c35491f18f811b88c33e466..5d76c19f29bac94def16b00c7441f5874d4140c4 100644 (file)
@@ -6,7 +6,9 @@ package health
 
 import (
        "bytes"
 
 import (
        "bytes"
+       "crypto/sha256"
        "encoding/json"
        "encoding/json"
+       "fmt"
        "io/ioutil"
        "net/http"
        "net/http/httptest"
        "io/ioutil"
        "net/http"
        "net/http/httptest"
@@ -121,6 +123,65 @@ func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
        c.Logf("%#v", ep)
 }
 
        c.Logf("%#v", ep)
 }
 
+func (s *AggregatorSuite) TestConfigMismatch(c *check.C) {
+       // time1/hash1: current config
+       time1 := time.Now().Add(time.Second - time.Minute - time.Hour)
+       hash1 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: xyzzy}}`)))
+       // time2/hash2: old config
+       time2 := time1.Add(-time.Hour)
+       hash2 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: old-token}}`)))
+
+       // srv1: current file
+       handler1 := healthyHandler{configHash: hash1, configTime: time1}
+       srv1, listen1 := s.stubServer(&handler1)
+       defer srv1.Close()
+       // srv2: old file, current content
+       handler2 := healthyHandler{configHash: hash1, configTime: time2}
+       srv2, listen2 := s.stubServer(&handler2)
+       defer srv2.Close()
+       // srv3: old file, old content
+       handler3 := healthyHandler{configHash: hash2, configTime: time2}
+       srv3, listen3 := s.stubServer(&handler3)
+       defer srv3.Close()
+       // srv4: no metrics handler
+       handler4 := healthyHandler{}
+       srv4, listen4 := s.stubServer(&handler4)
+       defer srv4.Close()
+
+       s.setAllServiceURLs(listen1)
+
+       // listen2 => old timestamp, same content => no problem
+       s.resp = httptest.NewRecorder()
+       arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
+               "http://localhost"+listen2+"/")
+       s.handler.ServeHTTP(s.resp, s.req)
+       resp := s.checkOK(c)
+
+       // listen4 => no metrics on some services => no problem
+       s.resp = httptest.NewRecorder()
+       arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
+               "http://localhost"+listen4+"/")
+       s.handler.ServeHTTP(s.resp, s.req)
+       resp = s.checkOK(c)
+
+       // listen3 => old timestamp, old content => report discrepancy
+       s.resp = httptest.NewRecorder()
+       arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore,
+               "http://localhost"+listen1+"/",
+               "http://localhost"+listen3+"/")
+       s.handler.ServeHTTP(s.resp, s.req)
+       resp = s.checkUnhealthy(c)
+       if c.Check(len(resp.Errors) > 0, check.Equals, true) {
+               c.Check(resp.Errors[0], check.Matches, `outdated config: \Qkeepstore+http://localhost`+listen3+`\E: config file \(sha256 .*\) does not match latest version with timestamp .*`)
+       }
+
+       // no services report config time (migrating to current version) => no problem
+       s.resp = httptest.NewRecorder()
+       s.setAllServiceURLs(listen4)
+       s.handler.ServeHTTP(s.resp, s.req)
+       s.checkOK(c)
+}
+
 func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
        s.handler.timeout = arvados.Duration(100 * time.Millisecond)
        srv, listen := s.stubServer(&slowHandler{})
 func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
        s.handler.timeout = arvados.Duration(100 * time.Millisecond)
        srv, listen := s.stubServer(&slowHandler{})
@@ -143,7 +204,7 @@ func (s *AggregatorSuite) TestCheckCommand(c *check.C) {
        tmpdir := c.MkDir()
        confdata, err := yaml.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{s.handler.Cluster.ClusterID: *s.handler.Cluster}})
        c.Assert(err, check.IsNil)
        tmpdir := c.MkDir()
        confdata, err := yaml.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{s.handler.Cluster.ClusterID: *s.handler.Cluster}})
        c.Assert(err, check.IsNil)
-       confdata = regexp.MustCompile(`SourceTimestamp: [^\n]+\n`).ReplaceAll(confdata, []byte{})
+       confdata = regexp.MustCompile(`Source(Timestamp|SHA256): [^\n]+\n`).ReplaceAll(confdata, []byte{})
        err = ioutil.WriteFile(tmpdir+"/config.yml", confdata, 0777)
        c.Assert(err, check.IsNil)
        var stdout, stderr bytes.Buffer
        err = ioutil.WriteFile(tmpdir+"/config.yml", confdata, 0777)
        c.Assert(err, check.IsNil)
        var stdout, stderr bytes.Buffer
@@ -209,11 +270,37 @@ func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)
        }
 }
 
        }
 }
 
-type healthyHandler struct{}
+type healthyHandler struct {
+       configHash string
+       configTime time.Time
+}
 
 
-func (*healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (h *healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       authOK := req.Header.Get("Authorization") == "Bearer "+arvadostest.ManagementToken
        if req.URL.Path == "/_health/ping" {
        if req.URL.Path == "/_health/ping" {
+               if !authOK {
+                       http.Error(resp, "unauthorized", http.StatusUnauthorized)
+                       return
+               }
                resp.Write([]byte(`{"health":"OK"}`))
                resp.Write([]byte(`{"health":"OK"}`))
+       } else if req.URL.Path == "/metrics" {
+               if !authOK {
+                       http.Error(resp, "unauthorized", http.StatusUnauthorized)
+                       return
+               }
+               t := h.configTime
+               if t.IsZero() {
+                       t = time.Now()
+               }
+               fmt.Fprintf(resp, `# HELP arvados_config_load_timestamp_seconds Time when config file was loaded.
+# TYPE arvados_config_load_timestamp_seconds gauge
+arvados_config_load_timestamp_seconds{sha256="%s"} %g
+# HELP arvados_config_source_timestamp_seconds Timestamp of config file when it was loaded.
+# TYPE arvados_config_source_timestamp_seconds gauge
+arvados_config_source_timestamp_seconds{sha256="%s"} %g
+`,
+                       h.configHash, float64(time.Now().UnixNano())/1e9,
+                       h.configHash, float64(t.UnixNano())/1e9)
        } else {
                http.Error(resp, "not found", http.StatusNotFound)
        }
        } else {
                http.Error(resp, "not found", http.StatusNotFound)
        }