16345: "arvados-server check" detects clock skew.
authorTom Clegg <tom@curii.com>
Thu, 5 May 2022 19:37:06 +0000 (15:37 -0400)
committerTom Clegg <tom@curii.com>
Tue, 10 May 2022 17:57:56 +0000 (13:57 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

sdk/go/health/aggregator.go
sdk/go/health/aggregator_test.go
services/health/main.go

index 5e010d88bc1bd32159ebfaf928392b948357cfd4..f473eff3537bba9186d41363d132fd980d94c382 100644 (file)
@@ -29,10 +29,14 @@ import (
        "git.arvados.org/arvados.git/sdk/go/auth"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/ghodss/yaml"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
 
-const defaultTimeout = arvados.Duration(2 * time.Second)
+const (
+       defaultTimeout = arvados.Duration(2 * time.Second)
+       maxClockSkew   = time.Minute
+)
 
 // Aggregator implements service.Handler. It handles "GET /_health/all"
 // by checking the health of all configured services on the cluster
@@ -46,6 +50,9 @@ type Aggregator struct {
 
        // If non-nil, Log is called after handling each request.
        Log func(*http.Request, error)
+
+       // If non-nil, report clock skew on each health-check.
+       MetricClockSkew prometheus.Gauge
 }
 
 func (agg *Aggregator) setup() {
@@ -114,6 +121,10 @@ type ClusterHealthResponse struct {
        // anywhere."
        Services map[arvados.ServiceName]ServiceHealth `json:"services"`
 
+       // Difference between min/max timestamps in individual
+       // health-check responses.
+       ClockSkew arvados.Duration
+
        Errors []string `json:"errors"`
 }
 
@@ -124,7 +135,9 @@ type CheckResult struct {
        HTTPStatusText string                 `json:",omitempty"`
        Response       map[string]interface{} `json:"response"`
        ResponseTime   json.Number            `json:"responseTime"`
+       ClockTime      time.Time              `json:"clockTime"`
        Metrics        Metrics                `json:"-"`
+       respTime       time.Duration
 }
 
 type Metrics struct {
@@ -225,6 +238,33 @@ func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
                }
        }
 
+       var maxResponseTime time.Duration
+       var clockMin, clockMax time.Time
+       for _, result := range resp.Checks {
+               if result.ClockTime.IsZero() {
+                       continue
+               }
+               if clockMin.IsZero() || result.ClockTime.Before(clockMin) {
+                       clockMin = result.ClockTime
+               }
+               if result.ClockTime.After(clockMax) {
+                       clockMax = result.ClockTime
+               }
+               if result.respTime > maxResponseTime {
+                       maxResponseTime = result.respTime
+               }
+       }
+       skew := clockMax.Sub(clockMin)
+       resp.ClockSkew = arvados.Duration(skew)
+       if skew > maxClockSkew+maxResponseTime {
+               msg := fmt.Sprintf("clock skew detected: maximum timestamp spread is %s (exceeds warning threshold of %s)", resp.ClockSkew, arvados.Duration(maxClockSkew))
+               resp.Errors = append(resp.Errors, msg)
+               resp.Health = "ERROR"
+       }
+       if agg.MetricClockSkew != nil {
+               agg.MetricClockSkew.Set(skew.Seconds())
+       }
+
        var newest Metrics
        for _, result := range resp.Checks {
                if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
@@ -256,7 +296,8 @@ func (agg *Aggregator) pingURL(svcURL arvados.URL) (*url.URL, error) {
 func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
        t0 := time.Now()
        defer func() {
-               result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
+               result.respTime = time.Since(t0)
+               result.ResponseTime = json.Number(fmt.Sprintf("%.6f", result.respTime.Seconds()))
        }()
        result.Health = "ERROR"
 
@@ -304,6 +345,7 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
                }
        }
        result.Health = "OK"
+       result.ClockTime, _ = time.Parse(time.RFC1123, resp.Header.Get("Date"))
        return
 }
 
index f8f7ff9f1b7e189b83423d017be2a48fd763cf28..5f60cf67f347f2e3ad86d97f9c709aa86833bc7e 100644 (file)
@@ -220,6 +220,40 @@ func (s *AggregatorSuite) TestConfigMismatch(c *check.C) {
        s.checkOK(c)
 }
 
+func (s *AggregatorSuite) TestClockSkew(c *check.C) {
+       // srv1: report real wall clock time
+       handler1 := healthyHandler{}
+       srv1, listen1 := s.stubServer(&handler1)
+       defer srv1.Close()
+       // srv2: report near-future time
+       handler2 := healthyHandler{headerDate: time.Now().Add(3 * time.Second)}
+       srv2, listen2 := s.stubServer(&handler2)
+       defer srv2.Close()
+       // srv3: report far-future time
+       handler3 := healthyHandler{headerDate: time.Now().Add(3*time.Minute + 3*time.Second)}
+       srv3, listen3 := s.stubServer(&handler3)
+       defer srv3.Close()
+
+       s.setAllServiceURLs(listen1)
+
+       // near-future time => OK
+       s.resp = httptest.NewRecorder()
+       arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
+               "http://localhost"+listen2+"/")
+       s.handler.ServeHTTP(s.resp, s.req)
+       s.checkOK(c)
+
+       // far-future time => error
+       s.resp = httptest.NewRecorder()
+       arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
+               "http://localhost"+listen3+"/")
+       s.handler.ServeHTTP(s.resp, s.req)
+       resp := s.checkUnhealthy(c)
+       if c.Check(len(resp.Errors) > 0, check.Equals, true) {
+               c.Check(resp.Errors[0], check.Matches, `clock skew detected: maximum timestamp spread is 3m.* \(exceeds warning threshold of 1m\)`)
+       }
+}
+
 func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
        s.handler.timeout = arvados.Duration(100 * time.Millisecond)
        srv, listen := s.stubServer(&slowHandler{})
@@ -321,9 +355,13 @@ func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)
 type healthyHandler struct {
        configHash string
        configTime time.Time
+       headerDate time.Time
 }
 
 func (h *healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       if !h.headerDate.IsZero() {
+               resp.Header().Set("Date", h.headerDate.Format(time.RFC1123))
+       }
        authOK := req.Header.Get("Authorization") == "Bearer "+arvadostest.ManagementToken
        if req.URL.Path == "/_health/ping" {
                if !authOK {
index bc57d36d04b1158cd8c6ade4191a63eed5fad354..92bd377c800041583ded6c6b161e88332639f69b 100644 (file)
@@ -20,8 +20,18 @@ var (
        command cmd.Handler = service.Command(arvados.ServiceNameHealth, newHandler)
 )
 
-func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler {
-       return &health.Aggregator{Cluster: cluster}
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string, reg *prometheus.Registry) service.Handler {
+       mClockSkew := prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "health",
+               Name:      "clock_skew_seconds",
+               Help:      "Clock skew observed in most recent health check",
+       })
+       reg.MustRegister(mClockSkew)
+       return &health.Aggregator{
+               Cluster:         cluster,
+               MetricClockSkew: mClockSkew,
+       }
 }
 
 func main() {