13497: Move common system service code to lib/service.
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 12 Jun 2018 15:10:12 +0000 (11:10 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 14 Jun 2018 17:35:40 +0000 (13:35 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

lib/cmd/cmd.go
lib/controller/cmd.go
lib/controller/handler.go
lib/service/cmd.go [new file with mode: 0644]
sdk/go/arvados/config.go
sdk/go/health/aggregator.go
sdk/go/httpserver/logger.go
sdk/go/httpserver/logger_test.go
services/keep-web/server.go
services/keepproxy/keepproxy.go
services/keepstore/handlers.go

index 0d3a07a6151887f67447db33307cd6da2d59ffa1..353167e80080cad13bac71d10eb1d33d3dd7034b 100644 (file)
@@ -58,6 +58,11 @@ func (m Multi) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 2
        }
        _, basename := filepath.Split(prog)
+       if strings.HasPrefix(basename, "arvados-") {
+               basename = basename[8:]
+       } else if strings.HasPrefix(basename, "crunch-") {
+               basename = basename[7:]
+       }
        if cmd, ok := m[basename]; ok {
                return cmd.RunCommand(prog, args, stdin, stdout, stderr)
        } else if cmd, ok = m[args[0]]; ok {
index c13d0fa07a1f2faeb7c0d276904b7345ee5e4ec0..2bb68aed9ed90b31999dd794aee8ff8da0aff5b1 100644 (file)
@@ -5,75 +5,13 @@
 package controller
 
 import (
-       "flag"
-       "fmt"
-       "io"
        "net/http"
 
        "git.curoverse.com/arvados.git/lib/cmd"
+       "git.curoverse.com/arvados.git/lib/service"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "github.com/Sirupsen/logrus"
 )
 
-const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
-
-var Command cmd.Handler = &command{}
-
-type command struct{}
-
-func (*command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
-       log := logrus.StandardLogger()
-       log.Formatter = &logrus.JSONFormatter{
-               TimestampFormat: rfc3339NanoFixed,
-       }
-       log.Out = stderr
-
-       var err error
-       defer func() {
-               if err != nil {
-                       log.WithError(err).Info("exiting")
-               }
-       }()
-       flags := flag.NewFlagSet("", flag.ContinueOnError)
-       flags.SetOutput(stderr)
-       configFile := flags.String("config", arvados.DefaultConfigFile, "Site configuration `file`")
-       err = flags.Parse(args)
-       if err != nil {
-               return 2
-       }
-       cfg, err := arvados.GetConfig(*configFile)
-       if err != nil {
-               return 1
-       }
-       cluster, err := cfg.GetCluster("")
-       if err != nil {
-               return 1
-       }
-       node, err := cluster.GetThisSystemNode()
-       if err != nil {
-               return 1
-       }
-       if node.Controller.Listen == "" {
-               err = fmt.Errorf("configuration does not run a controller on this host: Clusters[%q].SystemNodes[`hostname` or *].Controller.Listen == \"\"", cluster.ClusterID)
-               return 1
-       }
-       srv := &httpserver.Server{
-               Server: http.Server{
-                       Handler: httpserver.LogRequests(&Handler{
-                               Cluster: cluster,
-                       }),
-               },
-               Addr: node.Controller.Listen,
-       }
-       err = srv.Start()
-       if err != nil {
-               return 1
-       }
-       log.WithField("Listen", srv.Addr).Info("listening")
-       err = srv.Wait()
-       if err != nil {
-               return 1
-       }
-       return 0
-}
+var Command cmd.Handler = service.Command(arvados.ServiceNameController, func(cluster *arvados.Cluster, _ *arvados.SystemNode) http.Handler {
+       return &Handler{Cluster: cluster}
+})
index c51f8668bfc8c763af38557fa33560abd1b255b0..f0354d94d93b164c28906b192c7f02770c12c5f4 100644 (file)
@@ -12,14 +12,12 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/health"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
 )
 
 type Handler struct {
        Cluster *arvados.Cluster
 
        setupOnce    sync.Once
-       mux          http.ServeMux
        handlerStack http.Handler
        proxyClient  *arvados.Client
 }
@@ -30,12 +28,13 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 }
 
 func (h *Handler) setup() {
-       h.mux.Handle("/_health/", &health.Handler{
+       mux := http.NewServeMux()
+       mux.Handle("/_health/", &health.Handler{
                Token:  h.Cluster.ManagementToken,
                Prefix: "/_health/",
        })
-       h.mux.Handle("/", http.HandlerFunc(h.proxyRailsAPI))
-       h.handlerStack = httpserver.LogRequests(&h.mux)
+       mux.Handle("/", http.HandlerFunc(h.proxyRailsAPI))
+       h.handlerStack = mux
 }
 
 func (h *Handler) proxyRailsAPI(w http.ResponseWriter, incomingReq *http.Request) {
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
new file mode 100644 (file)
index 0000000..e59ac48
--- /dev/null
@@ -0,0 +1,101 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+// package service provides a cmd.Handler that brings up a system service.
+package service
+
+import (
+       "flag"
+       "fmt"
+       "io"
+       "net/http"
+
+       "git.curoverse.com/arvados.git/lib/cmd"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "github.com/Sirupsen/logrus"
+)
+
+type NewHandlerFunc func(*arvados.Cluster, *arvados.SystemNode) http.Handler
+
+type command struct {
+       newHandler NewHandlerFunc
+       svcName    arvados.ServiceName
+}
+
+// Command returns a cmd.Handler that loads site config, calls
+// newHandler with the current cluster and node configs, and brings up
+// an http server with the returned handler.
+//
+// The handler is wrapped with server middleware (adding X-Request-ID
+// headers, logging requests/responses, etc).
+func Command(svcName arvados.ServiceName, newHandler NewHandlerFunc) cmd.Handler {
+       return &command{
+               newHandler: newHandler,
+               svcName:    svcName,
+       }
+}
+
+func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       log := logrus.New()
+       log.Formatter = &logrus.JSONFormatter{
+               TimestampFormat: rfc3339NanoFixed,
+       }
+       log.Out = stderr
+
+       var err error
+       defer func() {
+               if err != nil {
+                       log.WithError(err).Info("exiting")
+               }
+       }()
+       flags := flag.NewFlagSet("", flag.ContinueOnError)
+       flags.SetOutput(stderr)
+       configFile := flags.String("config", arvados.DefaultConfigFile, "Site configuration `file`")
+       err = flags.Parse(args)
+       if err == flag.ErrHelp {
+               err = nil
+               return 0
+       } else if err != nil {
+               return 2
+       }
+       cfg, err := arvados.GetConfig(*configFile)
+       if err != nil {
+               return 1
+       }
+       cluster, err := cfg.GetCluster("")
+       if err != nil {
+               return 1
+       }
+       node, err := cluster.GetThisSystemNode()
+       if err != nil {
+               return 1
+       }
+       listen := node.ServicePorts()[c.svcName]
+       if listen == "" {
+               err = fmt.Errorf("configuration does not enable the %s service on this host", c.svcName)
+               return 1
+       }
+       srv := &httpserver.Server{
+               Server: http.Server{
+                       Handler: httpserver.AddRequestIDs(httpserver.LogRequests(log, c.newHandler(cluster, node))),
+               },
+               Addr: listen,
+       }
+       err = srv.Start()
+       if err != nil {
+               return 1
+       }
+       log.WithFields(logrus.Fields{
+               "Listen":  srv.Addr,
+               "Service": c.svcName,
+       }).Info("listening")
+       err = srv.Wait()
+       if err != nil {
+               return 1
+       }
+       return 0
+}
+
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
index 2de13d784146fef8fcc58ed1e4b156605c624622..875a274dc6259aedfd648a1785d01ce85bb828dc 100644 (file)
@@ -101,18 +101,31 @@ type SystemNode struct {
        Workbench   SystemServiceInstance `json:"arvados-workbench"`
 }
 
+type ServiceName string
+
+const (
+       ServiceNameRailsAPI    ServiceName = "arvados-api-server"
+       ServiceNameController  ServiceName = "arvados-controller"
+       ServiceNameNodemanager ServiceName = "arvados-node-manager"
+       ServiceNameWorkbench   ServiceName = "arvados-workbench"
+       ServiceNameWebsocket   ServiceName = "arvados-ws"
+       ServiceNameKeepweb     ServiceName = "keep-web"
+       ServiceNameKeepproxy   ServiceName = "keepproxy"
+       ServiceNameKeepstore   ServiceName = "keepstore"
+)
+
 // ServicePorts returns the configured listening address (or "" if
 // disabled) for each service on the node.
-func (sn *SystemNode) ServicePorts() map[string]string {
-       return map[string]string{
-               "arvados-api-server":   sn.RailsAPI.Listen,
-               "arvados-controller":   sn.Controller.Listen,
-               "arvados-node-manager": sn.Nodemanager.Listen,
-               "arvados-workbench":    sn.Workbench.Listen,
-               "arvados-ws":           sn.Websocket.Listen,
-               "keep-web":             sn.Keepweb.Listen,
-               "keepproxy":            sn.Keepproxy.Listen,
-               "keepstore":            sn.Keepstore.Listen,
+func (sn *SystemNode) ServicePorts() map[ServiceName]string {
+       return map[ServiceName]string{
+               ServiceNameRailsAPI:    sn.RailsAPI.Listen,
+               ServiceNameController:  sn.Controller.Listen,
+               ServiceNameNodemanager: sn.Nodemanager.Listen,
+               ServiceNameWorkbench:   sn.Workbench.Listen,
+               ServiceNameWebsocket:   sn.Websocket.Listen,
+               ServiceNameKeepweb:     sn.Keepweb.Listen,
+               ServiceNameKeepproxy:   sn.Keepproxy.Listen,
+               ServiceNameKeepstore:   sn.Keepstore.Listen,
        }
 }
 
index 5edb1f95ca86acbfac5bf7dfd961822a33003ee1..3f3a918004e58255ca06a99b142d825fb5f75354 100644 (file)
@@ -87,7 +87,7 @@ type ClusterHealthResponse struct {
        // exposes problems that can't be expressed in Checks, like
        // "service S is needed, but isn't configured to run
        // anywhere."
-       Services map[string]ServiceHealth `json:"services"`
+       Services map[arvados.ServiceName]ServiceHealth `json:"services"`
 }
 
 type CheckResult struct {
@@ -108,7 +108,7 @@ func (agg *Aggregator) ClusterHealth(cluster *arvados.Cluster) ClusterHealthResp
        resp := ClusterHealthResponse{
                Health:   "OK",
                Checks:   make(map[string]CheckResult),
-               Services: make(map[string]ServiceHealth),
+               Services: make(map[arvados.ServiceName]ServiceHealth),
        }
 
        mtx := sync.Mutex{}
@@ -128,7 +128,7 @@ func (agg *Aggregator) ClusterHealth(cluster *arvados.Cluster) ClusterHealthResp
                        }
 
                        wg.Add(1)
-                       go func(node, svc, addr string) {
+                       go func(node string, svc arvados.ServiceName, addr string) {
                                defer wg.Done()
                                var result CheckResult
                                url, err := agg.pingURL(node, addr)
@@ -143,7 +143,7 @@ func (agg *Aggregator) ClusterHealth(cluster *arvados.Cluster) ClusterHealthResp
 
                                mtx.Lock()
                                defer mtx.Unlock()
-                               resp.Checks[svc+"+"+url] = result
+                               resp.Checks[fmt.Sprintf("%s+%s", svc, url)] = result
                                if result.Health == "OK" {
                                        h := resp.Services[svc]
                                        h.N++
index 1a4b7c55925b20eb398cc9d9c402004a0d2f779c..ec3fa7fae18dc6b88022b95771efc1de83f96107 100644 (file)
@@ -19,15 +19,16 @@ type contextKey struct {
 
 var requestTimeContextKey = contextKey{"requestTime"}
 
-var Logger logrus.FieldLogger = logrus.StandardLogger()
-
 // LogRequests wraps an http.Handler, logging each request and
-// response via logrus.
-func LogRequests(h http.Handler) http.Handler {
+// response via logger.
+func LogRequests(logger logrus.FieldLogger, h http.Handler) http.Handler {
+       if logger == nil {
+               logger = logrus.StandardLogger()
+       }
        return http.HandlerFunc(func(wrapped http.ResponseWriter, req *http.Request) {
                w := &responseTimer{ResponseWriter: WrapResponseWriter(wrapped)}
                req = req.WithContext(context.WithValue(req.Context(), &requestTimeContextKey, time.Now()))
-               lgr := Logger.WithFields(logrus.Fields{
+               lgr := logger.WithFields(logrus.Fields{
                        "RequestID":       req.Header.Get("X-Request-Id"),
                        "remoteAddr":      req.RemoteAddr,
                        "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
index bbcafa143957ae0a165840e58336f52336b8d919..bdde3303e2f97c35b45e73c1dd207f30e521e13d 100644 (file)
@@ -9,11 +9,10 @@ import (
        "encoding/json"
        "net/http"
        "net/http/httptest"
-       "os"
        "testing"
        "time"
 
-       log "github.com/Sirupsen/logrus"
+       "github.com/Sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
@@ -26,12 +25,13 @@ var _ = check.Suite(&Suite{})
 type Suite struct{}
 
 func (s *Suite) TestLogRequests(c *check.C) {
-       defer log.SetOutput(os.Stdout)
        captured := &bytes.Buffer{}
-       log.SetOutput(captured)
-       log.SetFormatter(&log.JSONFormatter{
+       log := logrus.New()
+       log.Out = captured
+       log.Formatter = &logrus.JSONFormatter{
                TimestampFormat: time.RFC3339Nano,
-       })
+       }
+
        h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
                w.Write([]byte("hello world"))
        })
@@ -39,7 +39,7 @@ func (s *Suite) TestLogRequests(c *check.C) {
        req.Header.Set("X-Forwarded-For", "1.2.3.4:12345")
        c.Assert(err, check.IsNil)
        resp := httptest.NewRecorder()
-       AddRequestIDs(LogRequests(h)).ServeHTTP(resp, req)
+       AddRequestIDs(LogRequests(log, h)).ServeHTTP(resp, req)
 
        dec := json.NewDecoder(captured)
 
index 2995bd30abe0008fb9623aa1758907aae111ae8c..e51376c3bc35cc10a92bf5a6f7c646a18bea3476 100644 (file)
@@ -14,7 +14,7 @@ type server struct {
 }
 
 func (srv *server) Start() error {
-       srv.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(&handler{Config: srv.Config}))
+       srv.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(nil, &handler{Config: srv.Config}))
        srv.Addr = srv.Config.Listen
        return srv.Server.Start()
 }
index 07fc63b63f8c34b9b5a3d6c49dacdc1d44f36f88..b6c8bd66aa40f026051ba8b4885ce562fb580721 100644 (file)
@@ -182,7 +182,7 @@ func main() {
 
        // Start serving requests.
        router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken)
-       http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
+       http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(nil, router)))
 
        log.Println("shutting down")
 }
index a84a84db3c6027147168cc78f0e4615bde54ad2b..fb327a386b0f33fdae30f1e0d3e4f880c8d0bfa1 100644 (file)
@@ -92,7 +92,7 @@ func MakeRESTRouter() http.Handler {
 
        mux := http.NewServeMux()
        mux.Handle("/", theConfig.metrics.Instrument(
-               httpserver.AddRequestIDs(httpserver.LogRequests(rtr.limiter))))
+               httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter))))
        mux.HandleFunc("/metrics.json", theConfig.metrics.exportJSON)
        mux.Handle("/metrics", theConfig.metrics.exportProm)