package health
import (
+ "bufio"
"context"
"crypto/tls"
"encoding/json"
"io"
"net/http"
"net/url"
+ "regexp"
+ "strconv"
+ "strings"
"sync"
"time"
// "service S is needed, but isn't configured to run
// anywhere."
Services map[arvados.ServiceName]ServiceHealth `json:"services"`
+
+ Errors []string `json:"errors"`
}
type CheckResult struct {
HTTPStatusText string `json:",omitempty"`
Response map[string]interface{} `json:"response"`
ResponseTime json.Number `json:"responseTime"`
+ Metrics Metrics `json:"-"`
+}
+
+type Metrics struct {
+ ConfigSourceTimestamp time.Time
+ ConfigSourceSHA256 string
}
type ServiceHealth struct {
}
} else {
result = agg.ping(pingURL)
+ m, err := agg.metrics(pingURL)
+ if err != nil {
+ result.Error = "metrics: " + err.Error()
+ }
+ result.Metrics = m
}
mtx.Lock()
break
}
}
+
+ var newest Metrics
+ for _, result := range resp.Checks {
+ if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
+ newest = result.Metrics
+ }
+ }
+ var mismatches []string
+ for target, result := range resp.Checks {
+ if hash := result.Metrics.ConfigSourceSHA256; hash != "" && hash != newest.ConfigSourceSHA256 {
+ mismatches = append(mismatches, target)
+ }
+ }
+ for _, target := range mismatches {
+ msg := fmt.Sprintf("outdated config: %s: config file (sha256 %s) does not match latest version with timestamp %s",
+ strings.TrimSuffix(target, "/_health/ping"),
+ resp.Checks[target].Metrics.ConfigSourceSHA256,
+ newest.ConfigSourceTimestamp.Format(time.RFC3339))
+ resp.Errors = append(resp.Errors, msg)
+ resp.Health = "ERROR"
+ }
return resp
}
}()
result.Health = "ERROR"
- req, err := http.NewRequest("GET", target.String(), nil)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
+ defer cancel()
+ req, err := http.NewRequestWithContext(ctx, "GET", target.String(), nil)
if err != nil {
result.Error = err.Error()
return
// Avoid workbench1's redirect-http-to-https feature
req.Header.Set("X-Forwarded-Proto", "https")
- ctx, cancel := context.WithTimeout(req.Context(), time.Duration(agg.timeout))
- defer cancel()
- req = req.WithContext(ctx)
resp, err := agg.httpClient.Do(req)
if err != nil {
result.Error = err.Error()
return
}
+var reMetric = regexp.MustCompile(`([a-z_]+){sha256="([0-9a-f]+)"} (\d[\d\.e\+]+)`)
+
+func (agg *Aggregator) metrics(pingURL *url.URL) (result Metrics, err error) {
+ metricsURL, err := pingURL.Parse("/metrics")
+ if err != nil {
+ return
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
+ defer cancel()
+ req, err := http.NewRequestWithContext(ctx, "GET", metricsURL.String(), nil)
+ if err != nil {
+ return
+ }
+ req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
+
+ // Avoid workbench1's redirect-http-to-https feature
+ req.Header.Set("X-Forwarded-Proto", "https")
+
+ resp, err := agg.httpClient.Do(req)
+ if err != nil {
+ return
+ } else if resp.StatusCode != http.StatusOK {
+ err = fmt.Errorf("%s: HTTP %d %s", metricsURL.String(), resp.StatusCode, resp.Status)
+ return
+ }
+
+ scanner := bufio.NewScanner(resp.Body)
+ for scanner.Scan() {
+ m := reMetric.FindSubmatch(scanner.Bytes())
+ if len(m) != 4 || string(m[1]) != "arvados_config_source_timestamp_seconds" {
+ continue
+ }
+ result.ConfigSourceSHA256 = string(m[2])
+ unixtime, _ := strconv.ParseFloat(string(m[3]), 64)
+ result.ConfigSourceTimestamp = time.UnixMicro(int64(unixtime * 1e6))
+ }
+ if err = scanner.Err(); err != nil {
+ err = fmt.Errorf("error parsing response from %s: %w", metricsURL.String(), err)
+ return
+ }
+ return
+}
+
func (agg *Aggregator) checkAuth(req *http.Request) bool {
creds := auth.CredentialsFromRequest(req)
for _, token := range creds.Tokens {
import (
"bytes"
+ "crypto/sha256"
"encoding/json"
+ "fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
c.Logf("%#v", ep)
}
+func (s *AggregatorSuite) TestConfigMismatch(c *check.C) {
+ // time1/hash1: current config
+ time1 := time.Now().Add(time.Second - time.Minute - time.Hour)
+ hash1 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: xyzzy}}`)))
+ // time2/hash2: old config
+ time2 := time1.Add(-time.Hour)
+ hash2 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: old-token}}`)))
+
+ // srv1: current file
+ handler1 := healthyHandler{configHash: hash1, configTime: time1}
+ srv1, listen1 := s.stubServer(&handler1)
+ defer srv1.Close()
+ // srv2: old file, current content
+ handler2 := healthyHandler{configHash: hash1, configTime: time2}
+ srv2, listen2 := s.stubServer(&handler2)
+ defer srv2.Close()
+ // srv3: old file, old content
+ handler3 := healthyHandler{configHash: hash2, configTime: time2}
+ srv3, listen3 := s.stubServer(&handler3)
+ defer srv3.Close()
+ // srv4: no metrics handler
+ handler4 := healthyHandler{}
+ srv4, listen4 := s.stubServer(&handler4)
+ defer srv4.Close()
+
+ s.setAllServiceURLs(listen1)
+
+ // listen2 => old timestamp, same content => no problem
+ s.resp = httptest.NewRecorder()
+ arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
+ "http://localhost"+listen2+"/")
+ s.handler.ServeHTTP(s.resp, s.req)
+ resp := s.checkOK(c)
+
+ // listen4 => no metrics on some services => no problem
+ s.resp = httptest.NewRecorder()
+ arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
+ "http://localhost"+listen4+"/")
+ s.handler.ServeHTTP(s.resp, s.req)
+ resp = s.checkOK(c)
+
+ // listen3 => old timestamp, old content => report discrepancy
+ s.resp = httptest.NewRecorder()
+ arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore,
+ "http://localhost"+listen1+"/",
+ "http://localhost"+listen3+"/")
+ s.handler.ServeHTTP(s.resp, s.req)
+ resp = s.checkUnhealthy(c)
+ if c.Check(len(resp.Errors) > 0, check.Equals, true) {
+ c.Check(resp.Errors[0], check.Matches, `outdated config: \Qkeepstore+http://localhost`+listen3+`\E: config file \(sha256 .*\) does not match latest version with timestamp .*`)
+ }
+
+ // no services report config time (migrating to current version) => no problem
+ s.resp = httptest.NewRecorder()
+ s.setAllServiceURLs(listen4)
+ s.handler.ServeHTTP(s.resp, s.req)
+ s.checkOK(c)
+}
+
func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
s.handler.timeout = arvados.Duration(100 * time.Millisecond)
srv, listen := s.stubServer(&slowHandler{})
tmpdir := c.MkDir()
confdata, err := yaml.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{s.handler.Cluster.ClusterID: *s.handler.Cluster}})
c.Assert(err, check.IsNil)
- confdata = regexp.MustCompile(`SourceTimestamp: [^\n]+\n`).ReplaceAll(confdata, []byte{})
+ confdata = regexp.MustCompile(`Source(Timestamp|SHA256): [^\n]+\n`).ReplaceAll(confdata, []byte{})
err = ioutil.WriteFile(tmpdir+"/config.yml", confdata, 0777)
c.Assert(err, check.IsNil)
var stdout, stderr bytes.Buffer
}
}
-type healthyHandler struct{}
+type healthyHandler struct {
+ configHash string
+ configTime time.Time
+}
-func (*healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (h *healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ authOK := req.Header.Get("Authorization") == "Bearer "+arvadostest.ManagementToken
if req.URL.Path == "/_health/ping" {
+ if !authOK {
+ http.Error(resp, "unauthorized", http.StatusUnauthorized)
+ return
+ }
resp.Write([]byte(`{"health":"OK"}`))
+ } else if req.URL.Path == "/metrics" {
+ if !authOK {
+ http.Error(resp, "unauthorized", http.StatusUnauthorized)
+ return
+ }
+ t := h.configTime
+ if t.IsZero() {
+ t = time.Now()
+ }
+ fmt.Fprintf(resp, `# HELP arvados_config_load_timestamp_seconds Time when config file was loaded.
+# TYPE arvados_config_load_timestamp_seconds gauge
+arvados_config_load_timestamp_seconds{sha256="%s"} %g
+# HELP arvados_config_source_timestamp_seconds Timestamp of config file when it was loaded.
+# TYPE arvados_config_source_timestamp_seconds gauge
+arvados_config_source_timestamp_seconds{sha256="%s"} %g
+`,
+ h.configHash, float64(time.Now().UnixNano())/1e9,
+ h.configHash, float64(t.UnixNano())/1e9)
} else {
http.Error(resp, "not found", http.StatusNotFound)
}