12260: Improve data structures.
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 3 Oct 2017 02:36:35 +0000 (22:36 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 3 Oct 2017 02:36:35 +0000 (22:36 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/health/aggregator.go
sdk/go/health/aggregator_test.go

index 5c46c1aa710ffc53c3cc7e5ada32c4421eae571d..88a338ef49dae640bb69835690c646499aa48dae 100644 (file)
@@ -72,28 +72,39 @@ func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
                sendErr(http.StatusNotFound, errNotFound)
                return
        }
-       json.NewEncoder(resp).Encode(agg.checkClusterHealth(cluster))
+       json.NewEncoder(resp).Encode(agg.ClusterHealth(cluster))
        if agg.Log != nil {
                agg.Log(req, nil)
        }
 }
 
-type serviceHealth struct {
+type ServiceHealth struct {
        Health string `json:"health"`
        N      int    `json:"n"`
 }
 
-type clusterHealthResponse struct {
-       Health    string                            `json:"health"`
-       Endpoints map[string]map[string]interface{} `json:"endpoints"`
-       Services  map[string]serviceHealth          `json:"services"`
+type ClusterHealthResponse struct {
+       Health   string                   `json:"health"`
+       Checks   map[string]CheckResponse `json:"checks"`
+       Services map[string]ServiceHealth `json:"services"`
 }
 
-func (agg *Aggregator) checkClusterHealth(cluster *arvados.Cluster) clusterHealthResponse {
-       resp := clusterHealthResponse{
-               Health:    "OK",
-               Endpoints: make(map[string]map[string]interface{}),
-               Services:  make(map[string]serviceHealth),
+type CheckResponse struct {
+       Status       int         `json:"status"`
+       Health       string      `json:"health"`
+       Error        string      `json:"error,omitempty"`
+       ResponseTime json.Number `json:"responseTime"`
+}
+
+func (r *CheckResponse) OK() bool {
+       return r.Health == "OK" && r.Status == http.StatusOK
+}
+
+func (agg *Aggregator) ClusterHealth(cluster *arvados.Cluster) ClusterHealthResponse {
+       resp := ClusterHealthResponse{
+               Health:   "OK",
+               Checks:   make(map[string]CheckResponse),
+               Services: make(map[string]ServiceHealth),
        }
 
        mtx := sync.Mutex{}
@@ -106,21 +117,21 @@ func (agg *Aggregator) checkClusterHealth(cluster *arvados.Cluster) clusterHealt
                                continue
                        }
                        wg.Add(1)
-                       go func() {
+                       go func(node string) {
                                defer wg.Done()
                                pingResp := agg.ping(node, addr)
 
                                mtx.Lock()
                                defer mtx.Unlock()
-                               resp.Endpoints[node+"/"+svc+"/_health/ping"] = pingResp
+                               resp.Checks[node+"/"+svc+"/_health/ping"] = pingResp
                                svHealth := resp.Services[svc]
-                               if agg.isOK(pingResp) {
+                               if pingResp.OK() {
                                        svHealth.N++
                                } else {
                                        resp.Health = "ERROR"
                                }
                                resp.Services[svc] = svHealth
-                       }()
+                       }(node)
                }
        }
        wg.Wait()
@@ -137,20 +148,14 @@ func (agg *Aggregator) checkClusterHealth(cluster *arvados.Cluster) clusterHealt
        return resp
 }
 
-func (agg *Aggregator) isOK(result map[string]interface{}) bool {
-       h, ok := result["health"].(string)
-       return ok && h == "OK"
-}
-
-func (agg *Aggregator) ping(node, addr string) (result map[string]interface{}) {
+func (agg *Aggregator) ping(node, addr string) (result CheckResponse) {
        t0 := time.Now()
-       result = make(map[string]interface{})
 
        var err error
        defer func() {
-               result["responseTime"] = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
+               result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
                if err != nil {
-                       result["health"], result["error"] = "ERROR", err
+                       result.Health, result.Error = "ERROR", err.Error()
                }
        }()
 
@@ -176,11 +181,12 @@ func (agg *Aggregator) ping(node, addr string) (result map[string]interface{}) {
        if err != nil {
                return
        }
-       err = json.NewDecoder(resp.Body).Decode(result)
+       result.Status = resp.StatusCode
+       err = json.NewDecoder(resp.Body).Decode(&result)
        if err != nil {
                return
        }
-       if resp.StatusCode != 200 {
+       if resp.StatusCode != http.StatusOK {
                err = fmt.Errorf("HTTP %d %s", resp.StatusCode, resp.Status)
                return
        }
index eb6adc8a2e912fc0e43c8faa0f3013c9950a6f00..b66671b1e16f1c0839f9aa1165b1b042f6eb7c91 100644 (file)
@@ -5,6 +5,7 @@ import (
        "net/http"
        "net/http/httptest"
        "strings"
+       "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
@@ -57,49 +58,130 @@ func (s *AggregatorSuite) TestEmptyConfig(c *check.C) {
        s.checkOK(c)
 }
 
+func (s *AggregatorSuite) stubServer(handler http.Handler) (*httptest.Server, string) {
+       srv := httptest.NewServer(handler)
+       var port string
+       if parts := strings.Split(srv.URL, ":"); len(parts) < 3 {
+               panic(srv.URL)
+       } else {
+               port = parts[len(parts)-1]
+       }
+       return srv, ":" + port
+}
+
 type unhealthyHandler struct{}
 
 func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
-       resp.Write([]byte(`{"health":"ERROR"}`))
+       if req.URL.Path == "/_health/ping" {
+               resp.Write([]byte(`{"health":"ERROR","error":"the bends"}`))
+       } else {
+               http.Error(resp, "not found", http.StatusNotFound)
+       }
 }
 
 func (s *AggregatorSuite) TestUnhealthy(c *check.C) {
-       srv := httptest.NewServer(&unhealthyHandler{})
+       srv, listen := s.stubServer(&unhealthyHandler{})
        defer srv.Close()
+       s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+               Keepstore: arvados.Keepstore{Listen: listen},
+       }
+       s.handler.ServeHTTP(s.resp, s.req)
+       s.checkUnhealthy(c)
+}
 
-       var port string
-       if parts := strings.Split(srv.URL, ":"); len(parts) < 3 {
-               panic(srv.URL)
+type healthyHandler struct{}
+
+func (*healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       if req.URL.Path == "/_health/ping" {
+               resp.Write([]byte(`{"health":"OK"}`))
        } else {
-               port = parts[len(parts)-1]
+               http.Error(resp, "not found", http.StatusNotFound)
+       }
+}
+
+func (s *AggregatorSuite) TestHealthy(c *check.C) {
+       srv, listen := s.stubServer(&healthyHandler{})
+       defer srv.Close()
+       s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+               Keepstore: arvados.Keepstore{Listen: listen},
        }
+       s.handler.ServeHTTP(s.resp, s.req)
+       resp := s.checkOK(c)
+       ep := resp.Checks["localhost/keepstore/_health/ping"]
+       c.Check(ep.Health, check.Equals, "OK")
+       c.Check(ep.Status, check.Equals, 200)
+}
+
+func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
+       srvH, listenH := s.stubServer(&healthyHandler{})
+       defer srvH.Close()
+       srvU, listenU := s.stubServer(&unhealthyHandler{})
+       defer srvU.Close()
        s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
-               Keepstore: arvados.Keepstore{Listen: ":" + port},
+               Keepstore: arvados.Keepstore{Listen: listenH},
+       }
+       s.handler.Config.Clusters["zzzzz"].SystemNodes["127.0.0.1"] = arvados.SystemNode{
+               Keepstore: arvados.Keepstore{Listen: listenU},
        }
        s.handler.ServeHTTP(s.resp, s.req)
-       s.checkUnhealthy(c)
+       resp := s.checkUnhealthy(c)
+       ep := resp.Checks["localhost/keepstore/_health/ping"]
+       c.Check(ep.Health, check.Equals, "OK")
+       c.Check(ep.Status, check.Equals, 200)
+       ep = resp.Checks["127.0.0.1/keepstore/_health/ping"]
+       c.Check(ep.Health, check.Equals, "ERROR")
+       c.Check(ep.Status, check.Equals, 200)
 }
 
 func (s *AggregatorSuite) checkError(c *check.C) {
        c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
-       var body map[string]interface{}
-       err := json.NewDecoder(s.resp.Body).Decode(&body)
+       var resp ClusterHealthResponse
+       err := json.NewDecoder(s.resp.Body).Decode(&resp)
        c.Check(err, check.IsNil)
-       c.Check(body["health"], check.Not(check.Equals), "OK")
+       c.Check(resp.Health, check.Not(check.Equals), "OK")
+}
+
+func (s *AggregatorSuite) checkUnhealthy(c *check.C) ClusterHealthResponse {
+       return s.checkResult(c, "ERROR")
+}
+
+func (s *AggregatorSuite) checkOK(c *check.C) ClusterHealthResponse {
+       return s.checkResult(c, "OK")
 }
 
-func (s *AggregatorSuite) checkUnhealthy(c *check.C) {
+func (s *AggregatorSuite) checkResult(c *check.C, health string) ClusterHealthResponse {
        c.Check(s.resp.Code, check.Equals, http.StatusOK)
-       var body map[string]interface{}
-       err := json.NewDecoder(s.resp.Body).Decode(&body)
+       var resp ClusterHealthResponse
+       err := json.NewDecoder(s.resp.Body).Decode(&resp)
        c.Check(err, check.IsNil)
-       c.Check(body["health"], check.Equals, "ERROR")
+       c.Check(resp.Health, check.Equals, health)
+       return resp
 }
 
-func (s *AggregatorSuite) checkOK(c *check.C) {
-       c.Check(s.resp.Code, check.Equals, http.StatusOK)
-       var body map[string]interface{}
-       err := json.NewDecoder(s.resp.Body).Decode(&body)
+type slowHandler struct{}
+
+func (*slowHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       if req.URL.Path == "/_health/ping" {
+               time.Sleep(3 * time.Second)
+               resp.Write([]byte(`{"health":"OK"}`))
+       } else {
+               http.Error(resp, "not found", http.StatusNotFound)
+       }
+}
+
+func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
+       s.handler.timeout = arvados.Duration(100 * time.Millisecond)
+       srv, listen := s.stubServer(&slowHandler{})
+       defer srv.Close()
+       s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+               Keepstore: arvados.Keepstore{Listen: listen},
+       }
+       s.handler.ServeHTTP(s.resp, s.req)
+       resp := s.checkUnhealthy(c)
+       ep := resp.Checks["localhost/keepstore/_health/ping"]
+       c.Check(ep.Health, check.Equals, "ERROR")
+       c.Check(ep.Status, check.Equals, 0)
+       rt, err := ep.ResponseTime.Float64()
        c.Check(err, check.IsNil)
-       c.Check(body["health"], check.Equals, "OK")
+       c.Check(rt > 0.005, check.Equals, true)
 }