12260: Health check aggregator.
authorTom Clegg <tclegg@veritasgenetics.com>
Fri, 29 Sep 2017 03:28:13 +0000 (23:28 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Mon, 2 Oct 2017 20:59:06 +0000 (16:59 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

build/run-tests.sh
sdk/go/arvados/config.go [new file with mode: 0644]
sdk/go/auth/auth.go
sdk/go/health/aggregator.go [new file with mode: 0644]
sdk/go/health/aggregator_test.go [new file with mode: 0644]
services/health/main.go [new file with mode: 0644]

index 81c6612ed54b44483c46c3309f42bddf843a5e7d..63c8801998c7224d51cdd1f232f252843d1735e2 100755 (executable)
@@ -75,6 +75,7 @@ services/arv-git-httpd
 services/crunchstat
 services/dockercleaner
 services/fuse
+services/health
 services/keep-web
 services/keepproxy
 services/keepstore
@@ -800,6 +801,7 @@ gostuff=(
     lib/crunchstat
     services/arv-git-httpd
     services/crunchstat
+    services/health
     services/keep-web
     services/keepstore
     sdk/go/keepclient
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
new file mode 100644 (file)
index 0000000..537de2a
--- /dev/null
@@ -0,0 +1,87 @@
+package arvados
+
+import (
+       "fmt"
+       "os"
+       "strings"
+
+       "git.curoverse.com/arvados.git/sdk/go/config"
+)
+
+type Config struct {
+       Clusters map[string]Cluster
+}
+
+// GetConfig returns the current system config, loading it from
+// /etc if needed.
+func GetConfig() (*Config, error) {
+       var cfg Config
+       err := config.LoadFile(&cfg, "/etc/arvados/config.yml")
+       return &cfg, err
+}
+
+// GetCluster returns the cluster ID and config for the given
+// cluster, or the default/only configured cluster if clusterID is "".
+func (sc *Config) GetCluster(clusterID string) (*Cluster, error) {
+       if clusterID == "" {
+               if len(sc.Clusters) != 1 {
+                       return nil, fmt.Errorf("multiple clusters configured, cannot choose")
+               } else {
+                       for id, cc := range sc.Clusters {
+                               cc.ClusterID = id
+                               return &cc, nil
+                       }
+               }
+       }
+       if cc, ok := sc.Clusters[clusterID]; !ok {
+               return nil, fmt.Errorf("cluster %q is not configured", clusterID)
+       } else {
+               cc.ClusterID = clusterID
+               return &cc, nil
+       }
+}
+
+type Cluster struct {
+       ClusterID       string `json:"-"`
+       ManagementToken string
+       SystemNodes     map[string]SystemNode
+}
+
+// GetThisSystemNodeConfig returns a SystemNode for the node we're
+// running on right now.
+func (cc *Cluster) GetThisSystemNode() (*SystemNode, error) {
+       hostname, err := os.Hostname()
+       if err != nil {
+               return nil, err
+       }
+       return cc.GetSystemNode(hostname)
+}
+
+// GetSystemNodeConfig returns a NodeConfig for the given node. An
+// error is returned if the appropriate configuration can't be
+// determined (e.g., this does not appear to be a system node).
+func (cc *Cluster) GetSystemNode(node string) (*SystemNode, error) {
+       // Generally node is "a.b.ca", use the first of {"a.b.ca",
+       // "a.b", "a"} that has an entry in SystemNodes.
+       labels := strings.Split(node, ".")
+       for j := len(labels); j > 0; j-- {
+               hostpart := strings.Join(labels[:j], ".")
+               if cfg, ok := cc.SystemNodes[hostpart]; ok {
+                       return &cfg, nil
+               }
+       }
+       // If node is not listed, but "*" gives a default system node
+       // config, use the default config.
+       if cfg, ok := cc.SystemNodes["*"]; ok {
+               return &cfg, nil
+       }
+       return nil, fmt.Errorf("config does not provision host %q as a system node", node)
+}
+
+type SystemNode struct {
+       Keepstore Keepstore
+}
+
+type Keepstore struct {
+       Listen string
+}
index 730989b3a371ebcb2874784bbd76f668c22a6db2..ea492430e41297ddb8465c73b62c477e20af2357 100644 (file)
@@ -39,7 +39,7 @@ var DecodeTokenCookie func(string) ([]byte, error) = base64.URLEncoding.DecodeSt
 func (a *Credentials) LoadTokensFromHTTPRequest(r *http.Request) {
        // Load plain token from "Authorization: OAuth2 ..." header
        // (typically used by smart API clients)
-       if toks := strings.SplitN(r.Header.Get("Authorization"), " ", 2); len(toks) == 2 && toks[0] == "OAuth2" {
+       if toks := strings.SplitN(r.Header.Get("Authorization"), " ", 2); len(toks) == 2 && (toks[0] == "OAuth2" || toks[0] == "Bearer") {
                a.Tokens = append(a.Tokens, toks[1])
        }
 
diff --git a/sdk/go/health/aggregator.go b/sdk/go/health/aggregator.go
new file mode 100644 (file)
index 0000000..6993187
--- /dev/null
@@ -0,0 +1,197 @@
+package health
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "net"
+       "net/http"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/auth"
+)
+
+const defaultTimeout = arvados.Duration(2 * time.Second)
+
+// Aggregator implements http.Handler. It handles "GET /_health/all"
+// by checking the health of all configured services on the cluster
+// and responding 200 if everything is healthy.
+type Aggregator struct {
+       setupOnce  sync.Once
+       httpClient *http.Client
+       timeout    arvados.Duration
+
+       Config *arvados.Config
+
+       // If non-nil, Log is called after handling each request.
+       Log func(*http.Request, error)
+}
+
+func (agg *Aggregator) setup() {
+       agg.httpClient = http.DefaultClient
+       if agg.timeout == 0 {
+               // this is always the case, except in the test suite
+               agg.timeout = defaultTimeout
+       }
+}
+
+func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       sendErr := func(statusCode int, err error) {
+               resp.WriteHeader(statusCode)
+               json.NewEncoder(resp).Encode(map[string]interface{}{"error": err})
+               if agg.Log != nil {
+                       agg.Log(req, err)
+               }
+       }
+
+       resp.Header().Set("Content-Type", "application/json")
+
+       if agg.Config == nil {
+               cfg, err := arvados.GetConfig()
+               if err != nil {
+                       err = fmt.Errorf("arvados.GetConfig(): %s", err)
+                       sendErr(http.StatusInternalServerError, err)
+                       return
+               }
+               agg.Config = cfg
+       }
+       cluster, err := agg.Config.GetCluster("")
+       if err != nil {
+               err = fmt.Errorf("arvados.GetCluster(): %s", err)
+               sendErr(http.StatusInternalServerError, err)
+               return
+       }
+       if !agg.checkAuth(req, cluster) {
+               sendErr(http.StatusUnauthorized, errUnauthorized)
+               return
+       }
+       if req.URL.Path != "/_health/all" {
+               sendErr(http.StatusNotFound, errNotFound)
+               return
+       }
+       json.NewEncoder(resp).Encode(agg.checkClusterHealth(cluster))
+       if agg.Log != nil {
+               agg.Log(req, nil)
+       }
+}
+
+type serviceHealth struct {
+       Health string `json:"health"`
+       N      int    `json:"n"`
+}
+
+type clusterHealthResponse struct {
+       Health    string                            `json:"health"`
+       Endpoints map[string]map[string]interface{} `json:"endpoints"`
+       Services  map[string]serviceHealth          `json:"services"`
+}
+
+func (agg *Aggregator) checkClusterHealth(cluster *arvados.Cluster) clusterHealthResponse {
+       resp := clusterHealthResponse{
+               Health:    "OK",
+               Endpoints: make(map[string]map[string]interface{}),
+               Services:  make(map[string]serviceHealth),
+       }
+
+       mtx := sync.Mutex{}
+       wg := sync.WaitGroup{}
+       for node, nodeConfig := range cluster.SystemNodes {
+               for svc, addr := range map[string]string{
+                       "keepstore": nodeConfig.Keepstore.Listen,
+               } {
+                       if addr == "" {
+                               continue
+                       }
+                       wg.Add(1)
+                       go func() {
+                               defer wg.Done()
+                               pingResp := agg.ping(node, addr)
+
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               resp.Endpoints[node+"/"+svc+"/_health/ping"] = pingResp
+                               svHealth := resp.Services[svc]
+                               if agg.isOK(pingResp) {
+                                       svHealth.N++
+                               } else {
+                                       resp.Health = "ERROR"
+                               }
+                               resp.Services[svc] = svHealth
+                       }()
+               }
+       }
+       wg.Wait()
+
+       for svc, svHealth := range resp.Services {
+               if svHealth.N > 0 {
+                       svHealth.Health = "OK"
+               } else {
+                       svHealth.Health = "ERROR"
+               }
+               resp.Services[svc] = svHealth
+       }
+
+       return resp
+}
+
+func (agg *Aggregator) isOK(result map[string]interface{}) bool {
+       h, ok := result["health"].(string)
+       return ok && h == "OK"
+}
+
+func (agg *Aggregator) ping(node, addr string) (result map[string]interface{}) {
+       t0 := time.Now()
+       result = make(map[string]interface{})
+
+       var err error
+       defer func() {
+               result["responseTime"] = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
+               if err != nil {
+                       result["health"], result["error"] = "ERROR", err
+               }
+       }()
+
+       _, port, err := net.SplitHostPort(addr)
+       if err != nil {
+               return
+       }
+       req, err := http.NewRequest("GET", "http://"+node+":"+port+"/_health/ping", nil)
+       if err != nil {
+               return
+       }
+
+       ctx, cancel := context.WithCancel(req.Context())
+       go func() {
+               select {
+               case <-time.After(time.Duration(agg.timeout)):
+                       cancel()
+               case <-ctx.Done():
+               }
+       }()
+       req = req.WithContext(ctx)
+       resp, err := agg.httpClient.Do(req)
+       if err != nil {
+               return
+       }
+       err = json.NewDecoder(resp.Body).Decode(result)
+       if err != nil {
+               return
+       }
+       if resp.StatusCode != 200 {
+               err = fmt.Errorf("HTTP %d %s", resp.StatusCode, resp.Status)
+               return
+       }
+       return
+}
+
+func (agg *Aggregator) checkAuth(req *http.Request, cluster *arvados.Cluster) bool {
+       creds := auth.NewCredentialsFromHTTPRequest(req)
+       for _, token := range creds.Tokens {
+               if token != "" && token == cluster.ManagementToken {
+                       return true
+               }
+       }
+       return false
+}
diff --git a/sdk/go/health/aggregator_test.go b/sdk/go/health/aggregator_test.go
new file mode 100644 (file)
index 0000000..2cb7122
--- /dev/null
@@ -0,0 +1,73 @@
+package health
+
+import (
+       "encoding/json"
+       "net/http"
+       "net/http/httptest"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "gopkg.in/check.v1"
+)
+
+type AggregatorSuite struct {
+       handler *Aggregator
+       req     *http.Request
+       resp    *httptest.ResponseRecorder
+}
+
+// Gocheck boilerplate
+var _ = check.Suite(&AggregatorSuite{})
+
+func (s *AggregatorSuite) TestInterface(c *check.C) {
+       var _ http.Handler = &Aggregator{}
+}
+
+func (s *AggregatorSuite) SetUpTest(c *check.C) {
+       s.handler = &Aggregator{Config: &arvados.Config{
+               Clusters: map[string]arvados.Cluster{
+                       "zzzzz": {
+                               ManagementToken: arvadostest.ManagementToken,
+                               SystemNodes:     map[string]arvados.SystemNode{},
+                       },
+               },
+       }}
+       s.req = httptest.NewRequest("GET", "/_health/all", nil)
+       s.req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+       s.resp = httptest.NewRecorder()
+}
+
+func (s *AggregatorSuite) TestNoAuth(c *check.C) {
+       s.req.Header.Del("Authorization")
+       s.handler.ServeHTTP(s.resp, s.req)
+       s.checkError(c)
+       c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
+}
+
+func (s *AggregatorSuite) TestBadAuth(c *check.C) {
+       s.req.Header.Set("Authorization", "xyzzy")
+       s.handler.ServeHTTP(s.resp, s.req)
+       s.checkError(c)
+       c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
+}
+
+func (s *AggregatorSuite) TestEmptyConfig(c *check.C) {
+       s.handler.ServeHTTP(s.resp, s.req)
+       s.checkOK(c)
+}
+
+func (s *AggregatorSuite) checkError(c *check.C) {
+       c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
+       var body map[string]interface{}
+       err := json.NewDecoder(s.resp.Body).Decode(&body)
+       c.Check(err, check.IsNil)
+       c.Check(body["health"], check.Not(check.Equals), "OK")
+}
+
+func (s *AggregatorSuite) checkOK(c *check.C) {
+       c.Check(s.resp.Code, check.Equals, http.StatusOK)
+       var body map[string]interface{}
+       err := json.NewDecoder(s.resp.Body).Decode(&body)
+       c.Check(err, check.IsNil)
+       c.Check(body["health"], check.Equals, "OK")
+}
diff --git a/services/health/main.go b/services/health/main.go
new file mode 100644 (file)
index 0000000..7f4d648
--- /dev/null
@@ -0,0 +1,33 @@
+package main
+
+import (
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/health"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       log "github.com/Sirupsen/logrus"
+)
+
+func main() {
+       log.SetFormatter(&log.JSONFormatter{
+               TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
+       })
+       sysConf, err := arvados.GetSystemConfig()
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       srv := &httpserver.Server{
+               Addr: ":", // FIXME: should be dictated by Health on this SystemNode
+               Handler: &health.Aggregator{
+                       SystemConfig: sysConf,
+               },
+       }
+       srv.HandleFunc()
+       if err := srv.Start(); err != nil {
+               log.Fatal(err)
+       }
+       log.WithField("Listen", srv.Addr).Info("listening")
+       if err := srv.Wait(); err != nil {
+               log.Fatal(err)
+       }
+}