From 8f76037ba8a37c488612285ffe70d26d0d038124 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 12 Jun 2018 11:10:12 -0400 Subject: [PATCH] 13497: Move common system service code to lib/service. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/cmd/cmd.go | 5 ++ lib/controller/cmd.go | 70 ++------------------- lib/controller/handler.go | 9 ++- lib/service/cmd.go | 101 +++++++++++++++++++++++++++++++ sdk/go/arvados/config.go | 33 +++++++--- sdk/go/health/aggregator.go | 8 +-- sdk/go/httpserver/logger.go | 11 ++-- sdk/go/httpserver/logger_test.go | 14 ++--- services/keep-web/server.go | 2 +- services/keepproxy/keepproxy.go | 2 +- services/keepstore/handlers.go | 2 +- 11 files changed, 157 insertions(+), 100 deletions(-) create mode 100644 lib/service/cmd.go diff --git a/lib/cmd/cmd.go b/lib/cmd/cmd.go index 0d3a07a615..353167e800 100644 --- a/lib/cmd/cmd.go +++ b/lib/cmd/cmd.go @@ -58,6 +58,11 @@ func (m Multi) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 2 } _, basename := filepath.Split(prog) + if strings.HasPrefix(basename, "arvados-") { + basename = basename[8:] + } else if strings.HasPrefix(basename, "crunch-") { + basename = basename[7:] + } if cmd, ok := m[basename]; ok { return cmd.RunCommand(prog, args, stdin, stdout, stderr) } else if cmd, ok = m[args[0]]; ok { diff --git a/lib/controller/cmd.go b/lib/controller/cmd.go index c13d0fa07a..2bb68aed9e 100644 --- a/lib/controller/cmd.go +++ b/lib/controller/cmd.go @@ -5,75 +5,13 @@ package controller import ( - "flag" - "fmt" - "io" "net/http" "git.curoverse.com/arvados.git/lib/cmd" + "git.curoverse.com/arvados.git/lib/service" "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/httpserver" - "github.com/Sirupsen/logrus" ) -const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" - -var Command cmd.Handler = &command{} - -type command struct{} - -func (*command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { - log := logrus.StandardLogger() - log.Formatter = &logrus.JSONFormatter{ - TimestampFormat: rfc3339NanoFixed, - } - log.Out = stderr - - var err error - defer func() { - if err != nil { - log.WithError(err).Info("exiting") - } - }() - flags := flag.NewFlagSet("", flag.ContinueOnError) - flags.SetOutput(stderr) - configFile := flags.String("config", arvados.DefaultConfigFile, "Site configuration `file`") - err = flags.Parse(args) - if err != nil { - return 2 - } - cfg, err := arvados.GetConfig(*configFile) - if err != nil { - return 1 - } - cluster, err := cfg.GetCluster("") - if err != nil { - return 1 - } - node, err := cluster.GetThisSystemNode() - if err != nil { - return 1 - } - if node.Controller.Listen == "" { - err = fmt.Errorf("configuration does not run a controller on this host: Clusters[%q].SystemNodes[`hostname` or *].Controller.Listen == \"\"", cluster.ClusterID) - return 1 - } - srv := &httpserver.Server{ - Server: http.Server{ - Handler: httpserver.LogRequests(&Handler{ - Cluster: cluster, - }), - }, - Addr: node.Controller.Listen, - } - err = srv.Start() - if err != nil { - return 1 - } - log.WithField("Listen", srv.Addr).Info("listening") - err = srv.Wait() - if err != nil { - return 1 - } - return 0 -} +var Command cmd.Handler = service.Command(arvados.ServiceNameController, func(cluster *arvados.Cluster, _ *arvados.SystemNode) http.Handler { + return &Handler{Cluster: cluster} +}) diff --git a/lib/controller/handler.go b/lib/controller/handler.go index c51f8668bf..f0354d94d9 100644 --- a/lib/controller/handler.go +++ b/lib/controller/handler.go @@ -12,14 +12,12 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/health" - "git.curoverse.com/arvados.git/sdk/go/httpserver" ) type Handler struct { Cluster *arvados.Cluster setupOnce sync.Once - mux http.ServeMux handlerStack http.Handler proxyClient *arvados.Client } @@ -30,12 +28,13 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } func (h *Handler) setup() { - h.mux.Handle("/_health/", &health.Handler{ + mux := http.NewServeMux() + mux.Handle("/_health/", &health.Handler{ Token: h.Cluster.ManagementToken, Prefix: "/_health/", }) - h.mux.Handle("/", http.HandlerFunc(h.proxyRailsAPI)) - h.handlerStack = httpserver.LogRequests(&h.mux) + mux.Handle("/", http.HandlerFunc(h.proxyRailsAPI)) + h.handlerStack = mux } func (h *Handler) proxyRailsAPI(w http.ResponseWriter, incomingReq *http.Request) { diff --git a/lib/service/cmd.go b/lib/service/cmd.go new file mode 100644 index 0000000000..e59ac486a8 --- /dev/null +++ b/lib/service/cmd.go @@ -0,0 +1,101 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +// package service provides a cmd.Handler that brings up a system service. +package service + +import ( + "flag" + "fmt" + "io" + "net/http" + + "git.curoverse.com/arvados.git/lib/cmd" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/httpserver" + "github.com/Sirupsen/logrus" +) + +type NewHandlerFunc func(*arvados.Cluster, *arvados.SystemNode) http.Handler + +type command struct { + newHandler NewHandlerFunc + svcName arvados.ServiceName +} + +// Command returns a cmd.Handler that loads site config, calls +// newHandler with the current cluster and node configs, and brings up +// an http server with the returned handler. +// +// The handler is wrapped with server middleware (adding X-Request-ID +// headers, logging requests/responses, etc). +func Command(svcName arvados.ServiceName, newHandler NewHandlerFunc) cmd.Handler { + return &command{ + newHandler: newHandler, + svcName: svcName, + } +} + +func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + log := logrus.New() + log.Formatter = &logrus.JSONFormatter{ + TimestampFormat: rfc3339NanoFixed, + } + log.Out = stderr + + var err error + defer func() { + if err != nil { + log.WithError(err).Info("exiting") + } + }() + flags := flag.NewFlagSet("", flag.ContinueOnError) + flags.SetOutput(stderr) + configFile := flags.String("config", arvados.DefaultConfigFile, "Site configuration `file`") + err = flags.Parse(args) + if err == flag.ErrHelp { + err = nil + return 0 + } else if err != nil { + return 2 + } + cfg, err := arvados.GetConfig(*configFile) + if err != nil { + return 1 + } + cluster, err := cfg.GetCluster("") + if err != nil { + return 1 + } + node, err := cluster.GetThisSystemNode() + if err != nil { + return 1 + } + listen := node.ServicePorts()[c.svcName] + if listen == "" { + err = fmt.Errorf("configuration does not enable the %s service on this host", c.svcName) + return 1 + } + srv := &httpserver.Server{ + Server: http.Server{ + Handler: httpserver.AddRequestIDs(httpserver.LogRequests(log, c.newHandler(cluster, node))), + }, + Addr: listen, + } + err = srv.Start() + if err != nil { + return 1 + } + log.WithFields(logrus.Fields{ + "Listen": srv.Addr, + "Service": c.svcName, + }).Info("listening") + err = srv.Wait() + if err != nil { + return 1 + } + return 0 +} + +const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index 2de13d7841..875a274dc6 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -101,18 +101,31 @@ type SystemNode struct { Workbench SystemServiceInstance `json:"arvados-workbench"` } +type ServiceName string + +const ( + ServiceNameRailsAPI ServiceName = "arvados-api-server" + ServiceNameController ServiceName = "arvados-controller" + ServiceNameNodemanager ServiceName = "arvados-node-manager" + ServiceNameWorkbench ServiceName = "arvados-workbench" + ServiceNameWebsocket ServiceName = "arvados-ws" + ServiceNameKeepweb ServiceName = "keep-web" + ServiceNameKeepproxy ServiceName = "keepproxy" + ServiceNameKeepstore ServiceName = "keepstore" +) + // ServicePorts returns the configured listening address (or "" if // disabled) for each service on the node. -func (sn *SystemNode) ServicePorts() map[string]string { - return map[string]string{ - "arvados-api-server": sn.RailsAPI.Listen, - "arvados-controller": sn.Controller.Listen, - "arvados-node-manager": sn.Nodemanager.Listen, - "arvados-workbench": sn.Workbench.Listen, - "arvados-ws": sn.Websocket.Listen, - "keep-web": sn.Keepweb.Listen, - "keepproxy": sn.Keepproxy.Listen, - "keepstore": sn.Keepstore.Listen, +func (sn *SystemNode) ServicePorts() map[ServiceName]string { + return map[ServiceName]string{ + ServiceNameRailsAPI: sn.RailsAPI.Listen, + ServiceNameController: sn.Controller.Listen, + ServiceNameNodemanager: sn.Nodemanager.Listen, + ServiceNameWorkbench: sn.Workbench.Listen, + ServiceNameWebsocket: sn.Websocket.Listen, + ServiceNameKeepweb: sn.Keepweb.Listen, + ServiceNameKeepproxy: sn.Keepproxy.Listen, + ServiceNameKeepstore: sn.Keepstore.Listen, } } diff --git a/sdk/go/health/aggregator.go b/sdk/go/health/aggregator.go index 5edb1f95ca..3f3a918004 100644 --- a/sdk/go/health/aggregator.go +++ b/sdk/go/health/aggregator.go @@ -87,7 +87,7 @@ type ClusterHealthResponse struct { // exposes problems that can't be expressed in Checks, like // "service S is needed, but isn't configured to run // anywhere." - Services map[string]ServiceHealth `json:"services"` + Services map[arvados.ServiceName]ServiceHealth `json:"services"` } type CheckResult struct { @@ -108,7 +108,7 @@ func (agg *Aggregator) ClusterHealth(cluster *arvados.Cluster) ClusterHealthResp resp := ClusterHealthResponse{ Health: "OK", Checks: make(map[string]CheckResult), - Services: make(map[string]ServiceHealth), + Services: make(map[arvados.ServiceName]ServiceHealth), } mtx := sync.Mutex{} @@ -128,7 +128,7 @@ func (agg *Aggregator) ClusterHealth(cluster *arvados.Cluster) ClusterHealthResp } wg.Add(1) - go func(node, svc, addr string) { + go func(node string, svc arvados.ServiceName, addr string) { defer wg.Done() var result CheckResult url, err := agg.pingURL(node, addr) @@ -143,7 +143,7 @@ func (agg *Aggregator) ClusterHealth(cluster *arvados.Cluster) ClusterHealthResp mtx.Lock() defer mtx.Unlock() - resp.Checks[svc+"+"+url] = result + resp.Checks[fmt.Sprintf("%s+%s", svc, url)] = result if result.Health == "OK" { h := resp.Services[svc] h.N++ diff --git a/sdk/go/httpserver/logger.go b/sdk/go/httpserver/logger.go index 1a4b7c5592..ec3fa7fae1 100644 --- a/sdk/go/httpserver/logger.go +++ b/sdk/go/httpserver/logger.go @@ -19,15 +19,16 @@ type contextKey struct { var requestTimeContextKey = contextKey{"requestTime"} -var Logger logrus.FieldLogger = logrus.StandardLogger() - // LogRequests wraps an http.Handler, logging each request and -// response via logrus. -func LogRequests(h http.Handler) http.Handler { +// response via logger. +func LogRequests(logger logrus.FieldLogger, h http.Handler) http.Handler { + if logger == nil { + logger = logrus.StandardLogger() + } return http.HandlerFunc(func(wrapped http.ResponseWriter, req *http.Request) { w := &responseTimer{ResponseWriter: WrapResponseWriter(wrapped)} req = req.WithContext(context.WithValue(req.Context(), &requestTimeContextKey, time.Now())) - lgr := Logger.WithFields(logrus.Fields{ + lgr := logger.WithFields(logrus.Fields{ "RequestID": req.Header.Get("X-Request-Id"), "remoteAddr": req.RemoteAddr, "reqForwardedFor": req.Header.Get("X-Forwarded-For"), diff --git a/sdk/go/httpserver/logger_test.go b/sdk/go/httpserver/logger_test.go index bbcafa1439..bdde3303e2 100644 --- a/sdk/go/httpserver/logger_test.go +++ b/sdk/go/httpserver/logger_test.go @@ -9,11 +9,10 @@ import ( "encoding/json" "net/http" "net/http/httptest" - "os" "testing" "time" - log "github.com/Sirupsen/logrus" + "github.com/Sirupsen/logrus" check "gopkg.in/check.v1" ) @@ -26,12 +25,13 @@ var _ = check.Suite(&Suite{}) type Suite struct{} func (s *Suite) TestLogRequests(c *check.C) { - defer log.SetOutput(os.Stdout) captured := &bytes.Buffer{} - log.SetOutput(captured) - log.SetFormatter(&log.JSONFormatter{ + log := logrus.New() + log.Out = captured + log.Formatter = &logrus.JSONFormatter{ TimestampFormat: time.RFC3339Nano, - }) + } + h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { w.Write([]byte("hello world")) }) @@ -39,7 +39,7 @@ func (s *Suite) TestLogRequests(c *check.C) { req.Header.Set("X-Forwarded-For", "1.2.3.4:12345") c.Assert(err, check.IsNil) resp := httptest.NewRecorder() - AddRequestIDs(LogRequests(h)).ServeHTTP(resp, req) + AddRequestIDs(LogRequests(log, h)).ServeHTTP(resp, req) dec := json.NewDecoder(captured) diff --git a/services/keep-web/server.go b/services/keep-web/server.go index 2995bd30ab..e51376c3bc 100644 --- a/services/keep-web/server.go +++ b/services/keep-web/server.go @@ -14,7 +14,7 @@ type server struct { } func (srv *server) Start() error { - srv.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(&handler{Config: srv.Config})) + srv.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(nil, &handler{Config: srv.Config})) srv.Addr = srv.Config.Listen return srv.Server.Start() } diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go index 07fc63b63f..b6c8bd66aa 100644 --- a/services/keepproxy/keepproxy.go +++ b/services/keepproxy/keepproxy.go @@ -182,7 +182,7 @@ func main() { // Start serving requests. router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken) - http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router))) + http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(nil, router))) log.Println("shutting down") } diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index a84a84db3c..fb327a386b 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -92,7 +92,7 @@ func MakeRESTRouter() http.Handler { mux := http.NewServeMux() mux.Handle("/", theConfig.metrics.Instrument( - httpserver.AddRequestIDs(httpserver.LogRequests(rtr.limiter)))) + httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))) mux.HandleFunc("/metrics.json", theConfig.metrics.exportJSON) mux.Handle("/metrics", theConfig.metrics.exportProm) -- 2.30.2