services/crunchstat
services/dockercleaner
services/fuse
+services/health
services/keep-web
services/keepproxy
services/keepstore
lib/crunchstat
services/arv-git-httpd
services/crunchstat
+ services/health
services/keep-web
services/keepstore
sdk/go/keepclient
--- /dev/null
+package arvados
+
+import (
+ "fmt"
+ "os"
+
+ "git.curoverse.com/arvados.git/sdk/go/config"
+)
+
+const DefaultConfigFile = "/etc/arvados/config.yml"
+
+type Config struct {
+ Clusters map[string]Cluster
+}
+
+// GetConfig returns the current system config, loading it from
+// configFile if needed.
+func GetConfig(configFile string) (*Config, error) {
+ var cfg Config
+ err := config.LoadFile(&cfg, configFile)
+ return &cfg, err
+}
+
+// GetCluster returns the cluster ID and config for the given
+// cluster, or the default/only configured cluster if clusterID is "".
+func (sc *Config) GetCluster(clusterID string) (*Cluster, error) {
+ if clusterID == "" {
+ if len(sc.Clusters) == 0 {
+ return nil, fmt.Errorf("no clusters configured")
+ } else if len(sc.Clusters) > 1 {
+ return nil, fmt.Errorf("multiple clusters configured, cannot choose")
+ } else {
+ for id, cc := range sc.Clusters {
+ cc.ClusterID = id
+ return &cc, nil
+ }
+ }
+ }
+ if cc, ok := sc.Clusters[clusterID]; !ok {
+ return nil, fmt.Errorf("cluster %q is not configured", clusterID)
+ } else {
+ cc.ClusterID = clusterID
+ return &cc, nil
+ }
+}
+
+type Cluster struct {
+ ClusterID string `json:"-"`
+ ManagementToken string
+ SystemNodes map[string]SystemNode
+}
+
+// GetThisSystemNode returns a SystemNode for the node we're running
+// on right now.
+func (cc *Cluster) GetThisSystemNode() (*SystemNode, error) {
+ hostname, err := os.Hostname()
+ if err != nil {
+ return nil, err
+ }
+ return cc.GetSystemNode(hostname)
+}
+
+// GetSystemNode returns a SystemNode for the given hostname. An error
+// is returned if the appropriate configuration can't be determined
+// (e.g., this does not appear to be a system node).
+func (cc *Cluster) GetSystemNode(node string) (*SystemNode, error) {
+ if cfg, ok := cc.SystemNodes[node]; ok {
+ return &cfg, nil
+ }
+ // If node is not listed, but "*" gives a default system node
+ // config, use the default config.
+ if cfg, ok := cc.SystemNodes["*"]; ok {
+ return &cfg, nil
+ }
+ return nil, fmt.Errorf("config does not provision host %q as a system node", node)
+}
+
+type SystemNode struct {
+ Health SystemServiceInstance `json:"arvados-health"`
+ Keepproxy SystemServiceInstance `json:"keepproxy"`
+ Keepstore SystemServiceInstance `json:"keepstore"`
+ Keepweb SystemServiceInstance `json:"keep-web"`
+ Nodemanager SystemServiceInstance `json:"arvados-node-manager"`
+ RailsAPI SystemServiceInstance `json:"arvados-api-server"`
+ Websocket SystemServiceInstance `json:"arvados-ws"`
+ Workbench SystemServiceInstance `json:"arvados-workbench"`
+}
+
+// ServicePorts returns the configured listening address (or "" if
+// disabled) for each service on the node.
+func (sn *SystemNode) ServicePorts() map[string]string {
+ return map[string]string{
+ "arvados-api-server": sn.RailsAPI.Listen,
+ "arvados-node-manager": sn.Nodemanager.Listen,
+ "arvados-workbench": sn.Workbench.Listen,
+ "arvados-ws": sn.Websocket.Listen,
+ "keep-web": sn.Keepweb.Listen,
+ "keepproxy": sn.Keepproxy.Listen,
+ "keepstore": sn.Keepstore.Listen,
+ }
+}
+
+type SystemServiceInstance struct {
+ Listen string
+}
func (a *Credentials) LoadTokensFromHTTPRequest(r *http.Request) {
// Load plain token from "Authorization: OAuth2 ..." header
// (typically used by smart API clients)
- if toks := strings.SplitN(r.Header.Get("Authorization"), " ", 2); len(toks) == 2 && toks[0] == "OAuth2" {
+ if toks := strings.SplitN(r.Header.Get("Authorization"), " ", 2); len(toks) == 2 && (toks[0] == "OAuth2" || toks[0] == "Bearer") {
a.Tokens = append(a.Tokens, toks[1])
}
--- /dev/null
+package health
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "net"
+ "net/http"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/auth"
+)
+
+const defaultTimeout = arvados.Duration(2 * time.Second)
+
+// Aggregator implements http.Handler. It handles "GET /_health/all"
+// by checking the health of all configured services on the cluster
+// and responding 200 if everything is healthy.
+type Aggregator struct {
+ setupOnce sync.Once
+ httpClient *http.Client
+ timeout arvados.Duration
+
+ Config *arvados.Config
+
+ // If non-nil, Log is called after handling each request.
+ Log func(*http.Request, error)
+}
+
+func (agg *Aggregator) setup() {
+ agg.httpClient = http.DefaultClient
+ if agg.timeout == 0 {
+ // this is always the case, except in the test suite
+ agg.timeout = defaultTimeout
+ }
+}
+
+func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ agg.setupOnce.Do(agg.setup)
+ sendErr := func(statusCode int, err error) {
+ resp.WriteHeader(statusCode)
+ json.NewEncoder(resp).Encode(map[string]string{"error": err.Error()})
+ if agg.Log != nil {
+ agg.Log(req, err)
+ }
+ }
+
+ resp.Header().Set("Content-Type", "application/json")
+
+ cluster, err := agg.Config.GetCluster("")
+ if err != nil {
+ err = fmt.Errorf("arvados.GetCluster(): %s", err)
+ sendErr(http.StatusInternalServerError, err)
+ return
+ }
+ if !agg.checkAuth(req, cluster) {
+ sendErr(http.StatusUnauthorized, errUnauthorized)
+ return
+ }
+ if req.URL.Path != "/_health/all" {
+ sendErr(http.StatusNotFound, errNotFound)
+ return
+ }
+ json.NewEncoder(resp).Encode(agg.ClusterHealth(cluster))
+ if agg.Log != nil {
+ agg.Log(req, nil)
+ }
+}
+
+type ClusterHealthResponse struct {
+ // "OK" if all needed services are OK, otherwise "ERROR".
+ Health string `json:"health"`
+
+ // An entry for each known health check of each known instance
+ // of each needed component: "instance of service S on node N
+ // reports health-check C is OK."
+ Checks map[string]CheckResult `json:"checks"`
+
+ // An entry for each service type: "service S is OK." This
+ // exposes problems that can't be expressed in Checks, like
+ // "service S is needed, but isn't configured to run
+ // anywhere."
+ Services map[string]ServiceHealth `json:"services"`
+}
+
+type CheckResult struct {
+ Health string `json:"health"`
+ Error string `json:"error,omitempty"`
+ HTTPStatusCode int `json:",omitempty"`
+ HTTPStatusText string `json:",omitempty"`
+ Response map[string]interface{} `json:"response"`
+ ResponseTime json.Number `json:"responseTime"`
+}
+
+type ServiceHealth struct {
+ Health string `json:"health"`
+ N int `json:"n"`
+}
+
+func (agg *Aggregator) ClusterHealth(cluster *arvados.Cluster) ClusterHealthResponse {
+ resp := ClusterHealthResponse{
+ Health: "OK",
+ Checks: make(map[string]CheckResult),
+ Services: make(map[string]ServiceHealth),
+ }
+
+ mtx := sync.Mutex{}
+ wg := sync.WaitGroup{}
+ for node, nodeConfig := range cluster.SystemNodes {
+ for svc, addr := range nodeConfig.ServicePorts() {
+ // Ensure svc is listed in resp.Services.
+ mtx.Lock()
+ if _, ok := resp.Services[svc]; !ok {
+ resp.Services[svc] = ServiceHealth{Health: "ERROR"}
+ }
+ mtx.Unlock()
+
+ if addr == "" {
+ // svc is not expected on this node.
+ continue
+ }
+
+ wg.Add(1)
+ go func(node, svc, addr string) {
+ defer wg.Done()
+ var result CheckResult
+ url, err := agg.pingURL(node, addr)
+ if err != nil {
+ result = CheckResult{
+ Health: "ERROR",
+ Error: err.Error(),
+ }
+ } else {
+ result = agg.ping(url, cluster)
+ }
+
+ mtx.Lock()
+ defer mtx.Unlock()
+ resp.Checks[svc+"+"+url] = result
+ if result.Health == "OK" {
+ h := resp.Services[svc]
+ h.N++
+ h.Health = "OK"
+ resp.Services[svc] = h
+ } else {
+ resp.Health = "ERROR"
+ }
+ }(node, svc, addr)
+ }
+ }
+ wg.Wait()
+
+ // Report ERROR if a needed service didn't fail any checks
+ // merely because it isn't configured to run anywhere.
+ for _, sh := range resp.Services {
+ if sh.Health != "OK" {
+ resp.Health = "ERROR"
+ break
+ }
+ }
+ return resp
+}
+
+func (agg *Aggregator) pingURL(node, addr string) (string, error) {
+ _, port, err := net.SplitHostPort(addr)
+ return "http://" + node + ":" + port + "/_health/ping", err
+}
+
+func (agg *Aggregator) ping(url string, cluster *arvados.Cluster) (result CheckResult) {
+ t0 := time.Now()
+
+ var err error
+ defer func() {
+ result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
+ if err != nil {
+ result.Health, result.Error = "ERROR", err.Error()
+ } else {
+ result.Health = "OK"
+ }
+ }()
+
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return
+ }
+ req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
+
+ ctx, cancel := context.WithTimeout(req.Context(), time.Duration(agg.timeout))
+ defer cancel()
+ req = req.WithContext(ctx)
+ resp, err := agg.httpClient.Do(req)
+ if err != nil {
+ return
+ }
+ result.HTTPStatusCode = resp.StatusCode
+ result.HTTPStatusText = resp.Status
+ err = json.NewDecoder(resp.Body).Decode(&result.Response)
+ if err != nil {
+ err = fmt.Errorf("cannot decode response: %s", err)
+ return
+ } else if resp.StatusCode != http.StatusOK {
+ err = fmt.Errorf("HTTP %d %s", resp.StatusCode, resp.Status)
+ return
+ }
+ return
+}
+
+func (agg *Aggregator) checkAuth(req *http.Request, cluster *arvados.Cluster) bool {
+ creds := auth.NewCredentialsFromHTTPRequest(req)
+ for _, token := range creds.Tokens {
+ if token != "" && token == cluster.ManagementToken {
+ return true
+ }
+ }
+ return false
+}
--- /dev/null
+package health
+
+import (
+ "encoding/json"
+ "fmt"
+ "net"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "gopkg.in/check.v1"
+)
+
+type AggregatorSuite struct {
+ handler *Aggregator
+ req *http.Request
+ resp *httptest.ResponseRecorder
+}
+
+// Gocheck boilerplate
+var _ = check.Suite(&AggregatorSuite{})
+
+func (s *AggregatorSuite) TestInterface(c *check.C) {
+ var _ http.Handler = &Aggregator{}
+}
+
+func (s *AggregatorSuite) SetUpTest(c *check.C) {
+ s.handler = &Aggregator{Config: &arvados.Config{
+ Clusters: map[string]arvados.Cluster{
+ "zzzzz": {
+ ManagementToken: arvadostest.ManagementToken,
+ SystemNodes: map[string]arvados.SystemNode{},
+ },
+ },
+ }}
+ s.req = httptest.NewRequest("GET", "/_health/all", nil)
+ s.req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+ s.resp = httptest.NewRecorder()
+}
+
+func (s *AggregatorSuite) TestNoAuth(c *check.C) {
+ s.req.Header.Del("Authorization")
+ s.handler.ServeHTTP(s.resp, s.req)
+ s.checkError(c)
+ c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
+}
+
+func (s *AggregatorSuite) TestBadAuth(c *check.C) {
+ s.req.Header.Set("Authorization", "xyzzy")
+ s.handler.ServeHTTP(s.resp, s.req)
+ s.checkError(c)
+ c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
+}
+
+func (s *AggregatorSuite) TestEmptyConfig(c *check.C) {
+ s.handler.ServeHTTP(s.resp, s.req)
+ s.checkOK(c)
+}
+
+func (s *AggregatorSuite) stubServer(handler http.Handler) (*httptest.Server, string) {
+ srv := httptest.NewServer(handler)
+ var port string
+ if parts := strings.Split(srv.URL, ":"); len(parts) < 3 {
+ panic(srv.URL)
+ } else {
+ port = parts[len(parts)-1]
+ }
+ return srv, ":" + port
+}
+
+type unhealthyHandler struct{}
+
+func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ if req.URL.Path == "/_health/ping" {
+ resp.Write([]byte(`{"health":"ERROR","error":"the bends"}`))
+ } else {
+ http.Error(resp, "not found", http.StatusNotFound)
+ }
+}
+
+func (s *AggregatorSuite) TestUnhealthy(c *check.C) {
+ srv, listen := s.stubServer(&unhealthyHandler{})
+ defer srv.Close()
+ s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+ Keepstore: arvados.Keepstore{Listen: listen},
+ }
+ s.handler.ServeHTTP(s.resp, s.req)
+ s.checkUnhealthy(c)
+}
+
+type healthyHandler struct{}
+
+func (*healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ if req.URL.Path == "/_health/ping" {
+ resp.Write([]byte(`{"health":"OK"}`))
+ } else {
+ http.Error(resp, "not found", http.StatusNotFound)
+ }
+}
+
+func (s *AggregatorSuite) TestHealthy(c *check.C) {
+ srv, listen := s.stubServer(&healthyHandler{})
+ defer srv.Close()
+ _, port, _ := net.SplitHostPort(listen)
+ s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+ Keepstore: arvados.Keepstore{Listen: listen},
+ }
+ s.handler.ServeHTTP(s.resp, s.req)
+ resp := s.checkOK(c)
+ ep := resp.Checks[fmt.Sprintf("keepstore+http://localhost:%d/_health/ping", port)]
+ c.Check(ep.Health, check.Equals, "OK")
+ c.Check(ep.Status, check.Equals, 200)
+}
+
+func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
+ srvH, listenH := s.stubServer(&healthyHandler{})
+ defer srvH.Close()
+ _, portH, _ := net.SplitHostPort(listenH)
+ srvU, listenU := s.stubServer(&unhealthyHandler{})
+ defer srvU.Close()
+ _, portU, _ := net.SplitHostPort(listenU)
+ s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+ Keepstore: arvados.Keepstore{Listen: listenH},
+ }
+ s.handler.Config.Clusters["zzzzz"].SystemNodes["127.0.0.1"] = arvados.SystemNode{
+ Keepstore: arvados.Keepstore{Listen: listenU},
+ }
+ s.handler.ServeHTTP(s.resp, s.req)
+ resp := s.checkUnhealthy(c)
+ ep := resp.Checks[fmt.Sprintf("keepstore+http://localhost:%d/_health/ping", portH)]
+ c.Check(ep.Health, check.Equals, "OK")
+ c.Check(ep.Status, check.Equals, 200)
+ ep = resp.Checks[fmt.Sprintf("keepstore+http://127.0.0.1:%d/_health/ping", portU)]
+ c.Check(ep.Health, check.Equals, "ERROR")
+ c.Check(ep.Status, check.Equals, 200)
+}
+
+func (s *AggregatorSuite) checkError(c *check.C) {
+ c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
+ var resp ClusterHealthResponse
+ err := json.NewDecoder(s.resp.Body).Decode(&resp)
+ c.Check(err, check.IsNil)
+ c.Check(resp.Health, check.Not(check.Equals), "OK")
+}
+
+func (s *AggregatorSuite) checkUnhealthy(c *check.C) ClusterHealthResponse {
+ return s.checkResult(c, "ERROR")
+}
+
+func (s *AggregatorSuite) checkOK(c *check.C) ClusterHealthResponse {
+ return s.checkResult(c, "OK")
+}
+
+func (s *AggregatorSuite) checkResult(c *check.C, health string) ClusterHealthResponse {
+ c.Check(s.resp.Code, check.Equals, http.StatusOK)
+ var resp ClusterHealthResponse
+ err := json.NewDecoder(s.resp.Body).Decode(&resp)
+ c.Check(err, check.IsNil)
+ c.Check(resp.Health, check.Equals, health)
+ return resp
+}
+
+type slowHandler struct{}
+
+func (*slowHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ if req.URL.Path == "/_health/ping" {
+ time.Sleep(3 * time.Second)
+ resp.Write([]byte(`{"health":"OK"}`))
+ } else {
+ http.Error(resp, "not found", http.StatusNotFound)
+ }
+}
+
+func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
+ s.handler.timeout = arvados.Duration(100 * time.Millisecond)
+ srv, listen := s.stubServer(&slowHandler{})
+ defer srv.Close()
+ s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+ Keepstore: arvados.Keepstore{Listen: listen},
+ }
+ s.handler.ServeHTTP(s.resp, s.req)
+ resp := s.checkUnhealthy(c)
+ ep := resp.Checks["localhost/keepstore/_health/ping"]
+ c.Check(ep.Health, check.Equals, "ERROR")
+ c.Check(ep.Status, check.Equals, 0)
+ rt, err := ep.ResponseTime.Float64()
+ c.Check(err, check.IsNil)
+ c.Check(rt > 0.005, check.Equals, true)
+}
--- /dev/null
+package main
+
+import (
+ "flag"
+ "net/http"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/health"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ log "github.com/Sirupsen/logrus"
+)
+
+func main() {
+ configFile := flag.String("config", arvados.DefaultConfigFile, "`path` to arvados configuration file")
+ flag.Parse()
+
+ log.SetFormatter(&log.JSONFormatter{
+ TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
+ })
+ cfg, err := arvados.GetConfig(*configFile)
+ if err != nil {
+ log.Fatal(err)
+ }
+ clusterCfg, err := cfg.GetCluster("")
+ if err != nil {
+ log.Fatal(err)
+ }
+ nodeCfg, err := clusterCfg.GetThisSystemNode()
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ log := log.WithField("Service", "Health")
+ srv := &httpserver.Server{
+ Addr: nodeCfg.Health.Listen,
+ Server: http.Server{
+ Handler: &health.Aggregator{
+ Config: cfg,
+ Log: func(req *http.Request, err error) {
+ log.WithField("RemoteAddr", req.RemoteAddr).
+ WithField("Path", req.URL.Path).
+ WithError(err).
+ Info("HTTP request")
+ },
+ },
+ },
+ }
+ if err := srv.Start(); err != nil {
+ log.Fatal(err)
+ }
+ log.WithField("Listen", srv.Addr).Info("listening")
+ if err := srv.Wait(); err != nil {
+ log.Fatal(err)
+ }
+}