12260: Merge branch 'master' into 12260-system-health
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 17 Oct 2017 13:06:50 +0000 (09:06 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 17 Oct 2017 13:06:50 +0000 (09:06 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

build/run-tests.sh
sdk/go/arvados/config.go [new file with mode: 0644]
sdk/go/auth/auth.go
sdk/go/health/aggregator.go [new file with mode: 0644]
sdk/go/health/aggregator_test.go [new file with mode: 0644]
services/health/main.go [new file with mode: 0644]

index 81c6612ed54b44483c46c3309f42bddf843a5e7d..63c8801998c7224d51cdd1f232f252843d1735e2 100755 (executable)
@@ -75,6 +75,7 @@ services/arv-git-httpd
 services/crunchstat
 services/dockercleaner
 services/fuse
+services/health
 services/keep-web
 services/keepproxy
 services/keepstore
@@ -800,6 +801,7 @@ gostuff=(
     lib/crunchstat
     services/arv-git-httpd
     services/crunchstat
+    services/health
     services/keep-web
     services/keepstore
     sdk/go/keepclient
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
new file mode 100644 (file)
index 0000000..ca0df1f
--- /dev/null
@@ -0,0 +1,105 @@
+package arvados
+
+import (
+       "fmt"
+       "os"
+
+       "git.curoverse.com/arvados.git/sdk/go/config"
+)
+
+const DefaultConfigFile = "/etc/arvados/config.yml"
+
+type Config struct {
+       Clusters map[string]Cluster
+}
+
+// GetConfig returns the current system config, loading it from
+// configFile if needed.
+func GetConfig(configFile string) (*Config, error) {
+       var cfg Config
+       err := config.LoadFile(&cfg, configFile)
+       return &cfg, err
+}
+
+// GetCluster returns the cluster ID and config for the given
+// cluster, or the default/only configured cluster if clusterID is "".
+func (sc *Config) GetCluster(clusterID string) (*Cluster, error) {
+       if clusterID == "" {
+               if len(sc.Clusters) == 0 {
+                       return nil, fmt.Errorf("no clusters configured")
+               } else if len(sc.Clusters) > 1 {
+                       return nil, fmt.Errorf("multiple clusters configured, cannot choose")
+               } else {
+                       for id, cc := range sc.Clusters {
+                               cc.ClusterID = id
+                               return &cc, nil
+                       }
+               }
+       }
+       if cc, ok := sc.Clusters[clusterID]; !ok {
+               return nil, fmt.Errorf("cluster %q is not configured", clusterID)
+       } else {
+               cc.ClusterID = clusterID
+               return &cc, nil
+       }
+}
+
+type Cluster struct {
+       ClusterID       string `json:"-"`
+       ManagementToken string
+       SystemNodes     map[string]SystemNode
+}
+
+// GetThisSystemNode returns a SystemNode for the node we're running
+// on right now.
+func (cc *Cluster) GetThisSystemNode() (*SystemNode, error) {
+       hostname, err := os.Hostname()
+       if err != nil {
+               return nil, err
+       }
+       return cc.GetSystemNode(hostname)
+}
+
+// GetSystemNode returns a SystemNode for the given hostname. An error
+// is returned if the appropriate configuration can't be determined
+// (e.g., this does not appear to be a system node).
+func (cc *Cluster) GetSystemNode(node string) (*SystemNode, error) {
+       if cfg, ok := cc.SystemNodes[node]; ok {
+               return &cfg, nil
+       }
+       // If node is not listed, but "*" gives a default system node
+       // config, use the default config.
+       if cfg, ok := cc.SystemNodes["*"]; ok {
+               return &cfg, nil
+       }
+       return nil, fmt.Errorf("config does not provision host %q as a system node", node)
+}
+
+type SystemNode struct {
+       Health      SystemServiceInstance `json:"arvados-health"`
+       Keepproxy   SystemServiceInstance `json:"keepproxy"`
+       Keepstore   SystemServiceInstance `json:"keepstore"`
+       Keepweb     SystemServiceInstance `json:"keep-web"`
+       Nodemanager SystemServiceInstance `json:"arvados-node-manager"`
+       RailsAPI    SystemServiceInstance `json:"arvados-api-server"`
+       Websocket   SystemServiceInstance `json:"arvados-ws"`
+       Workbench   SystemServiceInstance `json:"arvados-workbench"`
+}
+
+// ServicePorts returns the configured listening address (or "" if
+// disabled) for each service on the node.
+func (sn *SystemNode) ServicePorts() map[string]string {
+       return map[string]string{
+               "arvados-api-server":   sn.RailsAPI.Listen,
+               "arvados-node-manager": sn.Nodemanager.Listen,
+               "arvados-workbench":    sn.Workbench.Listen,
+               "arvados-ws":           sn.Websocket.Listen,
+               "keep-web":             sn.Keepweb.Listen,
+               "keepproxy":            sn.Keepproxy.Listen,
+               "keepstore":            sn.Keepstore.Listen,
+       }
+}
+
+type SystemServiceInstance struct {
+       Listen string
+}
index 730989b3a371ebcb2874784bbd76f668c22a6db2..ea492430e41297ddb8465c73b62c477e20af2357 100644 (file)
@@ -39,7 +39,7 @@ var DecodeTokenCookie func(string) ([]byte, error) = base64.URLEncoding.DecodeSt
 func (a *Credentials) LoadTokensFromHTTPRequest(r *http.Request) {
        // Load plain token from "Authorization: OAuth2 ..." header
        // (typically used by smart API clients)
-       if toks := strings.SplitN(r.Header.Get("Authorization"), " ", 2); len(toks) == 2 && toks[0] == "OAuth2" {
+       if toks := strings.SplitN(r.Header.Get("Authorization"), " ", 2); len(toks) == 2 && (toks[0] == "OAuth2" || toks[0] == "Bearer") {
                a.Tokens = append(a.Tokens, toks[1])
        }
 
diff --git a/sdk/go/health/aggregator.go b/sdk/go/health/aggregator.go
new file mode 100644 (file)
index 0000000..334584b
--- /dev/null
@@ -0,0 +1,218 @@
+package health
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "net"
+       "net/http"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/auth"
+)
+
+const defaultTimeout = arvados.Duration(2 * time.Second)
+
+// Aggregator implements http.Handler. It handles "GET /_health/all"
+// by checking the health of all configured services on the cluster
+// and responding 200 if everything is healthy.
+type Aggregator struct {
+       setupOnce  sync.Once
+       httpClient *http.Client
+       timeout    arvados.Duration
+
+       Config *arvados.Config
+
+       // If non-nil, Log is called after handling each request.
+       Log func(*http.Request, error)
+}
+
+func (agg *Aggregator) setup() {
+       agg.httpClient = http.DefaultClient
+       if agg.timeout == 0 {
+               // this is always the case, except in the test suite
+               agg.timeout = defaultTimeout
+       }
+}
+
+func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       agg.setupOnce.Do(agg.setup)
+       sendErr := func(statusCode int, err error) {
+               resp.WriteHeader(statusCode)
+               json.NewEncoder(resp).Encode(map[string]string{"error": err.Error()})
+               if agg.Log != nil {
+                       agg.Log(req, err)
+               }
+       }
+
+       resp.Header().Set("Content-Type", "application/json")
+
+       cluster, err := agg.Config.GetCluster("")
+       if err != nil {
+               err = fmt.Errorf("arvados.GetCluster(): %s", err)
+               sendErr(http.StatusInternalServerError, err)
+               return
+       }
+       if !agg.checkAuth(req, cluster) {
+               sendErr(http.StatusUnauthorized, errUnauthorized)
+               return
+       }
+       if req.URL.Path != "/_health/all" {
+               sendErr(http.StatusNotFound, errNotFound)
+               return
+       }
+       json.NewEncoder(resp).Encode(agg.ClusterHealth(cluster))
+       if agg.Log != nil {
+               agg.Log(req, nil)
+       }
+}
+
+type ClusterHealthResponse struct {
+       // "OK" if all needed services are OK, otherwise "ERROR".
+       Health string `json:"health"`
+
+       // An entry for each known health check of each known instance
+       // of each needed component: "instance of service S on node N
+       // reports health-check C is OK."
+       Checks map[string]CheckResult `json:"checks"`
+
+       // An entry for each service type: "service S is OK." This
+       // exposes problems that can't be expressed in Checks, like
+       // "service S is needed, but isn't configured to run
+       // anywhere."
+       Services map[string]ServiceHealth `json:"services"`
+}
+
+type CheckResult struct {
+       Health         string                 `json:"health"`
+       Error          string                 `json:"error,omitempty"`
+       HTTPStatusCode int                    `json:",omitempty"`
+       HTTPStatusText string                 `json:",omitempty"`
+       Response       map[string]interface{} `json:"response"`
+       ResponseTime   json.Number            `json:"responseTime"`
+}
+
+type ServiceHealth struct {
+       Health string `json:"health"`
+       N      int    `json:"n"`
+}
+
+func (agg *Aggregator) ClusterHealth(cluster *arvados.Cluster) ClusterHealthResponse {
+       resp := ClusterHealthResponse{
+               Health:   "OK",
+               Checks:   make(map[string]CheckResult),
+               Services: make(map[string]ServiceHealth),
+       }
+
+       mtx := sync.Mutex{}
+       wg := sync.WaitGroup{}
+       for node, nodeConfig := range cluster.SystemNodes {
+               for svc, addr := range nodeConfig.ServicePorts() {
+                       // Ensure svc is listed in resp.Services.
+                       mtx.Lock()
+                       if _, ok := resp.Services[svc]; !ok {
+                               resp.Services[svc] = ServiceHealth{Health: "ERROR"}
+                       }
+                       mtx.Unlock()
+
+                       if addr == "" {
+                               // svc is not expected on this node.
+                               continue
+                       }
+
+                       wg.Add(1)
+                       go func(node, svc, addr string) {
+                               defer wg.Done()
+                               var result CheckResult
+                               url, err := agg.pingURL(node, addr)
+                               if err != nil {
+                                       result = CheckResult{
+                                               Health: "ERROR",
+                                               Error:  err.Error(),
+                                       }
+                               } else {
+                                       result = agg.ping(url, cluster)
+                               }
+
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               resp.Checks[svc+"+"+url] = result
+                               if result.Health == "OK" {
+                                       h := resp.Services[svc]
+                                       h.N++
+                                       h.Health = "OK"
+                                       resp.Services[svc] = h
+                               } else {
+                                       resp.Health = "ERROR"
+                               }
+                       }(node, svc, addr)
+               }
+       }
+       wg.Wait()
+
+       // Report ERROR if a needed service didn't fail any checks
+       // merely because it isn't configured to run anywhere.
+       for _, sh := range resp.Services {
+               if sh.Health != "OK" {
+                       resp.Health = "ERROR"
+                       break
+               }
+       }
+       return resp
+}
+
+func (agg *Aggregator) pingURL(node, addr string) (string, error) {
+       _, port, err := net.SplitHostPort(addr)
+       return "http://" + node + ":" + port + "/_health/ping", err
+}
+
+func (agg *Aggregator) ping(url string, cluster *arvados.Cluster) (result CheckResult) {
+       t0 := time.Now()
+
+       var err error
+       defer func() {
+               result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
+               if err != nil {
+                       result.Health, result.Error = "ERROR", err.Error()
+               } else {
+                       result.Health = "OK"
+               }
+       }()
+
+       req, err := http.NewRequest("GET", url, nil)
+       if err != nil {
+               return
+       }
+       req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
+
+       ctx, cancel := context.WithTimeout(req.Context(), time.Duration(agg.timeout))
+       defer cancel()
+       req = req.WithContext(ctx)
+       resp, err := agg.httpClient.Do(req)
+       if err != nil {
+               return
+       }
+       result.HTTPStatusCode = resp.StatusCode
+       result.HTTPStatusText = resp.Status
+       err = json.NewDecoder(resp.Body).Decode(&result.Response)
+       if err != nil {
+               err = fmt.Errorf("cannot decode response: %s", err)
+               return
+       } else if resp.StatusCode != http.StatusOK {
+               err = fmt.Errorf("HTTP %d %s", resp.StatusCode, resp.Status)
+               return
+       }
+       return
+}
+
+func (agg *Aggregator) checkAuth(req *http.Request, cluster *arvados.Cluster) bool {
+       creds := auth.NewCredentialsFromHTTPRequest(req)
+       for _, token := range creds.Tokens {
+               if token != "" && token == cluster.ManagementToken {
+                       return true
+               }
+       }
+       return false
+}
diff --git a/sdk/go/health/aggregator_test.go b/sdk/go/health/aggregator_test.go
new file mode 100644 (file)
index 0000000..048886a
--- /dev/null
@@ -0,0 +1,192 @@
+package health
+
+import (
+       "encoding/json"
+       "fmt"
+       "net"
+       "net/http"
+       "net/http/httptest"
+       "strings"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "gopkg.in/check.v1"
+)
+
+type AggregatorSuite struct {
+       handler *Aggregator
+       req     *http.Request
+       resp    *httptest.ResponseRecorder
+}
+
+// Gocheck boilerplate
+var _ = check.Suite(&AggregatorSuite{})
+
+func (s *AggregatorSuite) TestInterface(c *check.C) {
+       var _ http.Handler = &Aggregator{}
+}
+
+func (s *AggregatorSuite) SetUpTest(c *check.C) {
+       s.handler = &Aggregator{Config: &arvados.Config{
+               Clusters: map[string]arvados.Cluster{
+                       "zzzzz": {
+                               ManagementToken: arvadostest.ManagementToken,
+                               SystemNodes:     map[string]arvados.SystemNode{},
+                       },
+               },
+       }}
+       s.req = httptest.NewRequest("GET", "/_health/all", nil)
+       s.req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+       s.resp = httptest.NewRecorder()
+}
+
+func (s *AggregatorSuite) TestNoAuth(c *check.C) {
+       s.req.Header.Del("Authorization")
+       s.handler.ServeHTTP(s.resp, s.req)
+       s.checkError(c)
+       c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
+}
+
+func (s *AggregatorSuite) TestBadAuth(c *check.C) {
+       s.req.Header.Set("Authorization", "xyzzy")
+       s.handler.ServeHTTP(s.resp, s.req)
+       s.checkError(c)
+       c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
+}
+
+func (s *AggregatorSuite) TestEmptyConfig(c *check.C) {
+       s.handler.ServeHTTP(s.resp, s.req)
+       s.checkOK(c)
+}
+
+func (s *AggregatorSuite) stubServer(handler http.Handler) (*httptest.Server, string) {
+       srv := httptest.NewServer(handler)
+       var port string
+       if parts := strings.Split(srv.URL, ":"); len(parts) < 3 {
+               panic(srv.URL)
+       } else {
+               port = parts[len(parts)-1]
+       }
+       return srv, ":" + port
+}
+
+type unhealthyHandler struct{}
+
+func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       if req.URL.Path == "/_health/ping" {
+               resp.Write([]byte(`{"health":"ERROR","error":"the bends"}`))
+       } else {
+               http.Error(resp, "not found", http.StatusNotFound)
+       }
+}
+
+func (s *AggregatorSuite) TestUnhealthy(c *check.C) {
+       srv, listen := s.stubServer(&unhealthyHandler{})
+       defer srv.Close()
+       s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+               Keepstore: arvados.Keepstore{Listen: listen},
+       }
+       s.handler.ServeHTTP(s.resp, s.req)
+       s.checkUnhealthy(c)
+}
+
+type healthyHandler struct{}
+
+func (*healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       if req.URL.Path == "/_health/ping" {
+               resp.Write([]byte(`{"health":"OK"}`))
+       } else {
+               http.Error(resp, "not found", http.StatusNotFound)
+       }
+}
+
+func (s *AggregatorSuite) TestHealthy(c *check.C) {
+       srv, listen := s.stubServer(&healthyHandler{})
+       defer srv.Close()
+       _, port, _ := net.SplitHostPort(listen)
+       s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+               Keepstore: arvados.Keepstore{Listen: listen},
+       }
+       s.handler.ServeHTTP(s.resp, s.req)
+       resp := s.checkOK(c)
+       ep := resp.Checks[fmt.Sprintf("keepstore+http://localhost:%d/_health/ping", port)]
+       c.Check(ep.Health, check.Equals, "OK")
+       c.Check(ep.Status, check.Equals, 200)
+}
+
+func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
+       srvH, listenH := s.stubServer(&healthyHandler{})
+       defer srvH.Close()
+       _, portH, _ := net.SplitHostPort(listenH)
+       srvU, listenU := s.stubServer(&unhealthyHandler{})
+       defer srvU.Close()
+       _, portU, _ := net.SplitHostPort(listenU)
+       s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+               Keepstore: arvados.Keepstore{Listen: listenH},
+       }
+       s.handler.Config.Clusters["zzzzz"].SystemNodes["127.0.0.1"] = arvados.SystemNode{
+               Keepstore: arvados.Keepstore{Listen: listenU},
+       }
+       s.handler.ServeHTTP(s.resp, s.req)
+       resp := s.checkUnhealthy(c)
+       ep := resp.Checks[fmt.Sprintf("keepstore+http://localhost:%d/_health/ping", portH)]
+       c.Check(ep.Health, check.Equals, "OK")
+       c.Check(ep.Status, check.Equals, 200)
+       ep = resp.Checks[fmt.Sprintf("keepstore+http://127.0.0.1:%d/_health/ping", portU)]
+       c.Check(ep.Health, check.Equals, "ERROR")
+       c.Check(ep.Status, check.Equals, 200)
+}
+
+func (s *AggregatorSuite) checkError(c *check.C) {
+       c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
+       var resp ClusterHealthResponse
+       err := json.NewDecoder(s.resp.Body).Decode(&resp)
+       c.Check(err, check.IsNil)
+       c.Check(resp.Health, check.Not(check.Equals), "OK")
+}
+
+func (s *AggregatorSuite) checkUnhealthy(c *check.C) ClusterHealthResponse {
+       return s.checkResult(c, "ERROR")
+}
+
+func (s *AggregatorSuite) checkOK(c *check.C) ClusterHealthResponse {
+       return s.checkResult(c, "OK")
+}
+
+func (s *AggregatorSuite) checkResult(c *check.C, health string) ClusterHealthResponse {
+       c.Check(s.resp.Code, check.Equals, http.StatusOK)
+       var resp ClusterHealthResponse
+       err := json.NewDecoder(s.resp.Body).Decode(&resp)
+       c.Check(err, check.IsNil)
+       c.Check(resp.Health, check.Equals, health)
+       return resp
+}
+
+type slowHandler struct{}
+
+func (*slowHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       if req.URL.Path == "/_health/ping" {
+               time.Sleep(3 * time.Second)
+               resp.Write([]byte(`{"health":"OK"}`))
+       } else {
+               http.Error(resp, "not found", http.StatusNotFound)
+       }
+}
+
+func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
+       s.handler.timeout = arvados.Duration(100 * time.Millisecond)
+       srv, listen := s.stubServer(&slowHandler{})
+       defer srv.Close()
+       s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+               Keepstore: arvados.Keepstore{Listen: listen},
+       }
+       s.handler.ServeHTTP(s.resp, s.req)
+       resp := s.checkUnhealthy(c)
+       ep := resp.Checks["localhost/keepstore/_health/ping"]
+       c.Check(ep.Health, check.Equals, "ERROR")
+       c.Check(ep.Status, check.Equals, 0)
+       rt, err := ep.ResponseTime.Float64()
+       c.Check(err, check.IsNil)
+       c.Check(rt > 0.005, check.Equals, true)
+}
diff --git a/services/health/main.go b/services/health/main.go
new file mode 100644 (file)
index 0000000..b6358de
--- /dev/null
@@ -0,0 +1,55 @@
+package main
+
+import (
+       "flag"
+       "net/http"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/health"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       log "github.com/Sirupsen/logrus"
+)
+
+func main() {
+       configFile := flag.String("config", arvados.DefaultConfigFile, "`path` to arvados configuration file")
+       flag.Parse()
+
+       log.SetFormatter(&log.JSONFormatter{
+               TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
+       })
+       cfg, err := arvados.GetConfig(*configFile)
+       if err != nil {
+               log.Fatal(err)
+       }
+       clusterCfg, err := cfg.GetCluster("")
+       if err != nil {
+               log.Fatal(err)
+       }
+       nodeCfg, err := clusterCfg.GetThisSystemNode()
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       log := log.WithField("Service", "Health")
+       srv := &httpserver.Server{
+               Addr: nodeCfg.Health.Listen,
+               Server: http.Server{
+                       Handler: &health.Aggregator{
+                               Config: cfg,
+                               Log: func(req *http.Request, err error) {
+                                       log.WithField("RemoteAddr", req.RemoteAddr).
+                                               WithField("Path", req.URL.Path).
+                                               WithError(err).
+                                               Info("HTTP request")
+                               },
+                       },
+               },
+       }
+       if err := srv.Start(); err != nil {
+               log.Fatal(err)
+       }
+       log.WithField("Listen", srv.Addr).Info("listening")
+       if err := srv.Wait(); err != nil {
+               log.Fatal(err)
+       }
+}