sendErr(http.StatusNotFound, errNotFound)
return
}
- json.NewEncoder(resp).Encode(agg.checkClusterHealth(cluster))
+ json.NewEncoder(resp).Encode(agg.ClusterHealth(cluster))
if agg.Log != nil {
agg.Log(req, nil)
}
}
-type serviceHealth struct {
+type ServiceHealth struct {
Health string `json:"health"`
N int `json:"n"`
}
-type clusterHealthResponse struct {
- Health string `json:"health"`
- Endpoints map[string]map[string]interface{} `json:"endpoints"`
- Services map[string]serviceHealth `json:"services"`
+type ClusterHealthResponse struct {
+ Health string `json:"health"`
+ Checks map[string]CheckResponse `json:"checks"`
+ Services map[string]ServiceHealth `json:"services"`
}
-func (agg *Aggregator) checkClusterHealth(cluster *arvados.Cluster) clusterHealthResponse {
- resp := clusterHealthResponse{
- Health: "OK",
- Endpoints: make(map[string]map[string]interface{}),
- Services: make(map[string]serviceHealth),
+type CheckResponse struct {
+ Status int `json:"status"`
+ Health string `json:"health"`
+ Error string `json:"error,omitempty"`
+ ResponseTime json.Number `json:"responseTime"`
+}
+
+func (r *CheckResponse) OK() bool {
+ return r.Health == "OK" && r.Status == http.StatusOK
+}
+
+func (agg *Aggregator) ClusterHealth(cluster *arvados.Cluster) ClusterHealthResponse {
+ resp := ClusterHealthResponse{
+ Health: "OK",
+ Checks: make(map[string]CheckResponse),
+ Services: make(map[string]ServiceHealth),
}
mtx := sync.Mutex{}
continue
}
wg.Add(1)
- go func() {
+ go func(node string) {
defer wg.Done()
pingResp := agg.ping(node, addr)
mtx.Lock()
defer mtx.Unlock()
- resp.Endpoints[node+"/"+svc+"/_health/ping"] = pingResp
+ resp.Checks[node+"/"+svc+"/_health/ping"] = pingResp
svHealth := resp.Services[svc]
- if agg.isOK(pingResp) {
+ if pingResp.OK() {
svHealth.N++
} else {
resp.Health = "ERROR"
}
resp.Services[svc] = svHealth
- }()
+ }(node)
}
}
wg.Wait()
return resp
}
-func (agg *Aggregator) isOK(result map[string]interface{}) bool {
- h, ok := result["health"].(string)
- return ok && h == "OK"
-}
-
-func (agg *Aggregator) ping(node, addr string) (result map[string]interface{}) {
+func (agg *Aggregator) ping(node, addr string) (result CheckResponse) {
t0 := time.Now()
- result = make(map[string]interface{})
var err error
defer func() {
- result["responseTime"] = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
+ result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
if err != nil {
- result["health"], result["error"] = "ERROR", err
+ result.Health, result.Error = "ERROR", err.Error()
}
}()
if err != nil {
return
}
- err = json.NewDecoder(resp.Body).Decode(result)
+ result.Status = resp.StatusCode
+ err = json.NewDecoder(resp.Body).Decode(&result)
if err != nil {
return
}
- if resp.StatusCode != 200 {
+ if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("HTTP %d %s", resp.StatusCode, resp.Status)
return
}
"net/http"
"net/http/httptest"
"strings"
+ "time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
s.checkOK(c)
}
+func (s *AggregatorSuite) stubServer(handler http.Handler) (*httptest.Server, string) {
+ srv := httptest.NewServer(handler)
+ var port string
+ if parts := strings.Split(srv.URL, ":"); len(parts) < 3 {
+ panic(srv.URL)
+ } else {
+ port = parts[len(parts)-1]
+ }
+ return srv, ":" + port
+}
+
type unhealthyHandler struct{}
func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- resp.Write([]byte(`{"health":"ERROR"}`))
+ if req.URL.Path == "/_health/ping" {
+ resp.Write([]byte(`{"health":"ERROR","error":"the bends"}`))
+ } else {
+ http.Error(resp, "not found", http.StatusNotFound)
+ }
}
func (s *AggregatorSuite) TestUnhealthy(c *check.C) {
- srv := httptest.NewServer(&unhealthyHandler{})
+ srv, listen := s.stubServer(&unhealthyHandler{})
defer srv.Close()
+ s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+ Keepstore: arvados.Keepstore{Listen: listen},
+ }
+ s.handler.ServeHTTP(s.resp, s.req)
+ s.checkUnhealthy(c)
+}
- var port string
- if parts := strings.Split(srv.URL, ":"); len(parts) < 3 {
- panic(srv.URL)
+type healthyHandler struct{}
+
+func (*healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ if req.URL.Path == "/_health/ping" {
+ resp.Write([]byte(`{"health":"OK"}`))
} else {
- port = parts[len(parts)-1]
+ http.Error(resp, "not found", http.StatusNotFound)
+ }
+}
+
+func (s *AggregatorSuite) TestHealthy(c *check.C) {
+ srv, listen := s.stubServer(&healthyHandler{})
+ defer srv.Close()
+ s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+ Keepstore: arvados.Keepstore{Listen: listen},
}
+ s.handler.ServeHTTP(s.resp, s.req)
+ resp := s.checkOK(c)
+ ep := resp.Checks["localhost/keepstore/_health/ping"]
+ c.Check(ep.Health, check.Equals, "OK")
+ c.Check(ep.Status, check.Equals, 200)
+}
+
+func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
+ srvH, listenH := s.stubServer(&healthyHandler{})
+ defer srvH.Close()
+ srvU, listenU := s.stubServer(&unhealthyHandler{})
+ defer srvU.Close()
s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
- Keepstore: arvados.Keepstore{Listen: ":" + port},
+ Keepstore: arvados.Keepstore{Listen: listenH},
+ }
+ s.handler.Config.Clusters["zzzzz"].SystemNodes["127.0.0.1"] = arvados.SystemNode{
+ Keepstore: arvados.Keepstore{Listen: listenU},
}
s.handler.ServeHTTP(s.resp, s.req)
- s.checkUnhealthy(c)
+ resp := s.checkUnhealthy(c)
+ ep := resp.Checks["localhost/keepstore/_health/ping"]
+ c.Check(ep.Health, check.Equals, "OK")
+ c.Check(ep.Status, check.Equals, 200)
+ ep = resp.Checks["127.0.0.1/keepstore/_health/ping"]
+ c.Check(ep.Health, check.Equals, "ERROR")
+ c.Check(ep.Status, check.Equals, 200)
}
func (s *AggregatorSuite) checkError(c *check.C) {
c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
- var body map[string]interface{}
- err := json.NewDecoder(s.resp.Body).Decode(&body)
+ var resp ClusterHealthResponse
+ err := json.NewDecoder(s.resp.Body).Decode(&resp)
c.Check(err, check.IsNil)
- c.Check(body["health"], check.Not(check.Equals), "OK")
+ c.Check(resp.Health, check.Not(check.Equals), "OK")
+}
+
+func (s *AggregatorSuite) checkUnhealthy(c *check.C) ClusterHealthResponse {
+ return s.checkResult(c, "ERROR")
+}
+
+func (s *AggregatorSuite) checkOK(c *check.C) ClusterHealthResponse {
+ return s.checkResult(c, "OK")
}
-func (s *AggregatorSuite) checkUnhealthy(c *check.C) {
+func (s *AggregatorSuite) checkResult(c *check.C, health string) ClusterHealthResponse {
c.Check(s.resp.Code, check.Equals, http.StatusOK)
- var body map[string]interface{}
- err := json.NewDecoder(s.resp.Body).Decode(&body)
+ var resp ClusterHealthResponse
+ err := json.NewDecoder(s.resp.Body).Decode(&resp)
c.Check(err, check.IsNil)
- c.Check(body["health"], check.Equals, "ERROR")
+ c.Check(resp.Health, check.Equals, health)
+ return resp
}
-func (s *AggregatorSuite) checkOK(c *check.C) {
- c.Check(s.resp.Code, check.Equals, http.StatusOK)
- var body map[string]interface{}
- err := json.NewDecoder(s.resp.Body).Decode(&body)
+type slowHandler struct{}
+
+func (*slowHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ if req.URL.Path == "/_health/ping" {
+ time.Sleep(3 * time.Second)
+ resp.Write([]byte(`{"health":"OK"}`))
+ } else {
+ http.Error(resp, "not found", http.StatusNotFound)
+ }
+}
+
+func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
+ s.handler.timeout = arvados.Duration(100 * time.Millisecond)
+ srv, listen := s.stubServer(&slowHandler{})
+ defer srv.Close()
+ s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+ Keepstore: arvados.Keepstore{Listen: listen},
+ }
+ s.handler.ServeHTTP(s.resp, s.req)
+ resp := s.checkUnhealthy(c)
+ ep := resp.Checks["localhost/keepstore/_health/ping"]
+ c.Check(ep.Health, check.Equals, "ERROR")
+ c.Check(ep.Status, check.Equals, 0)
+ rt, err := ep.ResponseTime.Float64()
c.Check(err, check.IsNil)
- c.Check(body["health"], check.Equals, "OK")
+ c.Check(rt > 0.005, check.Equals, true)
}