From: Tom Clegg Date: Mon, 11 Apr 2022 15:18:50 +0000 (-0400) Subject: 18794: Fail health check on mismatched configs. X-Git-Tag: 2.5.0~184^2~21 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/fe6606652dfe274627eadcef902d6e30d2856440 18794: Fail health check on mismatched configs. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/sdk/go/health/aggregator.go b/sdk/go/health/aggregator.go index ba532dddca..249df5e23b 100644 --- a/sdk/go/health/aggregator.go +++ b/sdk/go/health/aggregator.go @@ -5,6 +5,7 @@ package health import ( + "bufio" "context" "crypto/tls" "encoding/json" @@ -14,6 +15,9 @@ import ( "io" "net/http" "net/url" + "regexp" + "strconv" + "strings" "sync" "time" @@ -107,6 +111,8 @@ type ClusterHealthResponse struct { // "service S is needed, but isn't configured to run // anywhere." Services map[arvados.ServiceName]ServiceHealth `json:"services"` + + Errors []string `json:"errors"` } type CheckResult struct { @@ -116,6 +122,12 @@ type CheckResult struct { HTTPStatusText string `json:",omitempty"` Response map[string]interface{} `json:"response"` ResponseTime json.Number `json:"responseTime"` + Metrics Metrics `json:"-"` +} + +type Metrics struct { + ConfigSourceTimestamp time.Time + ConfigSourceSHA256 string } type ServiceHealth struct { @@ -161,6 +173,11 @@ func (agg *Aggregator) ClusterHealth() ClusterHealthResponse { } } else { result = agg.ping(pingURL) + m, err := agg.metrics(pingURL) + if err != nil { + result.Error = "metrics: " + err.Error() + } + result.Metrics = m } mtx.Lock() @@ -187,6 +204,27 @@ func (agg *Aggregator) ClusterHealth() ClusterHealthResponse { break } } + + var newest Metrics + for _, result := range resp.Checks { + if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) { + newest = result.Metrics + } + } + var mismatches []string + for target, result := range resp.Checks { + if hash := result.Metrics.ConfigSourceSHA256; hash != "" && hash != newest.ConfigSourceSHA256 { + mismatches = append(mismatches, target) + } + } + for _, target := range mismatches { + msg := fmt.Sprintf("outdated config: %s: config file (sha256 %s) does not match latest version with timestamp %s", + strings.TrimSuffix(target, "/_health/ping"), + resp.Checks[target].Metrics.ConfigSourceSHA256, + newest.ConfigSourceTimestamp.Format(time.RFC3339)) + resp.Errors = append(resp.Errors, msg) + resp.Health = "ERROR" + } return resp } @@ -202,7 +240,9 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) { }() result.Health = "ERROR" - req, err := http.NewRequest("GET", target.String(), nil) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout)) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "GET", target.String(), nil) if err != nil { result.Error = err.Error() return @@ -212,9 +252,6 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) { // Avoid workbench1's redirect-http-to-https feature req.Header.Set("X-Forwarded-Proto", "https") - ctx, cancel := context.WithTimeout(req.Context(), time.Duration(agg.timeout)) - defer cancel() - req = req.WithContext(ctx) resp, err := agg.httpClient.Do(req) if err != nil { result.Error = err.Error() @@ -240,6 +277,49 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) { return } +var reMetric = regexp.MustCompile(`([a-z_]+){sha256="([0-9a-f]+)"} (\d[\d\.e\+]+)`) + +func (agg *Aggregator) metrics(pingURL *url.URL) (result Metrics, err error) { + metricsURL, err := pingURL.Parse("/metrics") + if err != nil { + return + } + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout)) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "GET", metricsURL.String(), nil) + if err != nil { + return + } + req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken) + + // Avoid workbench1's redirect-http-to-https feature + req.Header.Set("X-Forwarded-Proto", "https") + + resp, err := agg.httpClient.Do(req) + if err != nil { + return + } else if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("%s: HTTP %d %s", metricsURL.String(), resp.StatusCode, resp.Status) + return + } + + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + m := reMetric.FindSubmatch(scanner.Bytes()) + if len(m) != 4 || string(m[1]) != "arvados_config_source_timestamp_seconds" { + continue + } + result.ConfigSourceSHA256 = string(m[2]) + unixtime, _ := strconv.ParseFloat(string(m[3]), 64) + result.ConfigSourceTimestamp = time.UnixMicro(int64(unixtime * 1e6)) + } + if err = scanner.Err(); err != nil { + err = fmt.Errorf("error parsing response from %s: %w", metricsURL.String(), err) + return + } + return +} + func (agg *Aggregator) checkAuth(req *http.Request) bool { creds := auth.CredentialsFromRequest(req) for _, token := range creds.Tokens { diff --git a/sdk/go/health/aggregator_test.go b/sdk/go/health/aggregator_test.go index 62eca894b7..5d76c19f29 100644 --- a/sdk/go/health/aggregator_test.go +++ b/sdk/go/health/aggregator_test.go @@ -6,7 +6,9 @@ package health import ( "bytes" + "crypto/sha256" "encoding/json" + "fmt" "io/ioutil" "net/http" "net/http/httptest" @@ -121,6 +123,65 @@ func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) { c.Logf("%#v", ep) } +func (s *AggregatorSuite) TestConfigMismatch(c *check.C) { + // time1/hash1: current config + time1 := time.Now().Add(time.Second - time.Minute - time.Hour) + hash1 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: xyzzy}}`))) + // time2/hash2: old config + time2 := time1.Add(-time.Hour) + hash2 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: old-token}}`))) + + // srv1: current file + handler1 := healthyHandler{configHash: hash1, configTime: time1} + srv1, listen1 := s.stubServer(&handler1) + defer srv1.Close() + // srv2: old file, current content + handler2 := healthyHandler{configHash: hash1, configTime: time2} + srv2, listen2 := s.stubServer(&handler2) + defer srv2.Close() + // srv3: old file, old content + handler3 := healthyHandler{configHash: hash2, configTime: time2} + srv3, listen3 := s.stubServer(&handler3) + defer srv3.Close() + // srv4: no metrics handler + handler4 := healthyHandler{} + srv4, listen4 := s.stubServer(&handler4) + defer srv4.Close() + + s.setAllServiceURLs(listen1) + + // listen2 => old timestamp, same content => no problem + s.resp = httptest.NewRecorder() + arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud, + "http://localhost"+listen2+"/") + s.handler.ServeHTTP(s.resp, s.req) + resp := s.checkOK(c) + + // listen4 => no metrics on some services => no problem + s.resp = httptest.NewRecorder() + arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, + "http://localhost"+listen4+"/") + s.handler.ServeHTTP(s.resp, s.req) + resp = s.checkOK(c) + + // listen3 => old timestamp, old content => report discrepancy + s.resp = httptest.NewRecorder() + arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, + "http://localhost"+listen1+"/", + "http://localhost"+listen3+"/") + s.handler.ServeHTTP(s.resp, s.req) + resp = s.checkUnhealthy(c) + if c.Check(len(resp.Errors) > 0, check.Equals, true) { + c.Check(resp.Errors[0], check.Matches, `outdated config: \Qkeepstore+http://localhost`+listen3+`\E: config file \(sha256 .*\) does not match latest version with timestamp .*`) + } + + // no services report config time (migrating to current version) => no problem + s.resp = httptest.NewRecorder() + s.setAllServiceURLs(listen4) + s.handler.ServeHTTP(s.resp, s.req) + s.checkOK(c) +} + func (s *AggregatorSuite) TestPingTimeout(c *check.C) { s.handler.timeout = arvados.Duration(100 * time.Millisecond) srv, listen := s.stubServer(&slowHandler{}) @@ -143,7 +204,7 @@ func (s *AggregatorSuite) TestCheckCommand(c *check.C) { tmpdir := c.MkDir() confdata, err := yaml.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{s.handler.Cluster.ClusterID: *s.handler.Cluster}}) c.Assert(err, check.IsNil) - confdata = regexp.MustCompile(`SourceTimestamp: [^\n]+\n`).ReplaceAll(confdata, []byte{}) + confdata = regexp.MustCompile(`Source(Timestamp|SHA256): [^\n]+\n`).ReplaceAll(confdata, []byte{}) err = ioutil.WriteFile(tmpdir+"/config.yml", confdata, 0777) c.Assert(err, check.IsNil) var stdout, stderr bytes.Buffer @@ -209,11 +270,37 @@ func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) } } -type healthyHandler struct{} +type healthyHandler struct { + configHash string + configTime time.Time +} -func (*healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (h *healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + authOK := req.Header.Get("Authorization") == "Bearer "+arvadostest.ManagementToken if req.URL.Path == "/_health/ping" { + if !authOK { + http.Error(resp, "unauthorized", http.StatusUnauthorized) + return + } resp.Write([]byte(`{"health":"OK"}`)) + } else if req.URL.Path == "/metrics" { + if !authOK { + http.Error(resp, "unauthorized", http.StatusUnauthorized) + return + } + t := h.configTime + if t.IsZero() { + t = time.Now() + } + fmt.Fprintf(resp, `# HELP arvados_config_load_timestamp_seconds Time when config file was loaded. +# TYPE arvados_config_load_timestamp_seconds gauge +arvados_config_load_timestamp_seconds{sha256="%s"} %g +# HELP arvados_config_source_timestamp_seconds Timestamp of config file when it was loaded. +# TYPE arvados_config_source_timestamp_seconds gauge +arvados_config_source_timestamp_seconds{sha256="%s"} %g +`, + h.configHash, float64(time.Now().UnixNano())/1e9, + h.configHash, float64(t.UnixNano())/1e9) } else { http.Error(resp, "not found", http.StatusNotFound) }