return 2
}
_, basename := filepath.Split(prog)
+ if strings.HasPrefix(basename, "arvados-") {
+ basename = basename[8:]
+ } else if strings.HasPrefix(basename, "crunch-") {
+ basename = basename[7:]
+ }
if cmd, ok := m[basename]; ok {
return cmd.RunCommand(prog, args, stdin, stdout, stderr)
} else if cmd, ok = m[args[0]]; ok {
package controller
import (
- "flag"
- "fmt"
- "io"
"net/http"
"git.curoverse.com/arvados.git/lib/cmd"
+ "git.curoverse.com/arvados.git/lib/service"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
- "github.com/Sirupsen/logrus"
)
-const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
-
-var Command cmd.Handler = &command{}
-
-type command struct{}
-
-func (*command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
- log := logrus.StandardLogger()
- log.Formatter = &logrus.JSONFormatter{
- TimestampFormat: rfc3339NanoFixed,
- }
- log.Out = stderr
-
- var err error
- defer func() {
- if err != nil {
- log.WithError(err).Info("exiting")
- }
- }()
- flags := flag.NewFlagSet("", flag.ContinueOnError)
- flags.SetOutput(stderr)
- configFile := flags.String("config", arvados.DefaultConfigFile, "Site configuration `file`")
- err = flags.Parse(args)
- if err != nil {
- return 2
- }
- cfg, err := arvados.GetConfig(*configFile)
- if err != nil {
- return 1
- }
- cluster, err := cfg.GetCluster("")
- if err != nil {
- return 1
- }
- node, err := cluster.GetThisSystemNode()
- if err != nil {
- return 1
- }
- if node.Controller.Listen == "" {
- err = fmt.Errorf("configuration does not run a controller on this host: Clusters[%q].SystemNodes[`hostname` or *].Controller.Listen == \"\"", cluster.ClusterID)
- return 1
- }
- srv := &httpserver.Server{
- Server: http.Server{
- Handler: httpserver.LogRequests(&Handler{
- Cluster: cluster,
- }),
- },
- Addr: node.Controller.Listen,
- }
- err = srv.Start()
- if err != nil {
- return 1
- }
- log.WithField("Listen", srv.Addr).Info("listening")
- err = srv.Wait()
- if err != nil {
- return 1
- }
- return 0
-}
+var Command cmd.Handler = service.Command(arvados.ServiceNameController, func(cluster *arvados.Cluster, _ *arvados.SystemNode) http.Handler {
+ return &Handler{Cluster: cluster}
+})
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/health"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
)
type Handler struct {
Cluster *arvados.Cluster
setupOnce sync.Once
- mux http.ServeMux
handlerStack http.Handler
proxyClient *arvados.Client
}
}
func (h *Handler) setup() {
- h.mux.Handle("/_health/", &health.Handler{
+ mux := http.NewServeMux()
+ mux.Handle("/_health/", &health.Handler{
Token: h.Cluster.ManagementToken,
Prefix: "/_health/",
})
- h.mux.Handle("/", http.HandlerFunc(h.proxyRailsAPI))
- h.handlerStack = httpserver.LogRequests(&h.mux)
+ mux.Handle("/", http.HandlerFunc(h.proxyRailsAPI))
+ h.handlerStack = mux
}
func (h *Handler) proxyRailsAPI(w http.ResponseWriter, incomingReq *http.Request) {
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+// package service provides a cmd.Handler that brings up a system service.
+package service
+
+import (
+ "flag"
+ "fmt"
+ "io"
+ "net/http"
+
+ "git.curoverse.com/arvados.git/lib/cmd"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "github.com/Sirupsen/logrus"
+)
+
+type NewHandlerFunc func(*arvados.Cluster, *arvados.SystemNode) http.Handler
+
+type command struct {
+ newHandler NewHandlerFunc
+ svcName arvados.ServiceName
+}
+
+// Command returns a cmd.Handler that loads site config, calls
+// newHandler with the current cluster and node configs, and brings up
+// an http server with the returned handler.
+//
+// The handler is wrapped with server middleware (adding X-Request-ID
+// headers, logging requests/responses, etc).
+func Command(svcName arvados.ServiceName, newHandler NewHandlerFunc) cmd.Handler {
+ return &command{
+ newHandler: newHandler,
+ svcName: svcName,
+ }
+}
+
+func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ log := logrus.New()
+ log.Formatter = &logrus.JSONFormatter{
+ TimestampFormat: rfc3339NanoFixed,
+ }
+ log.Out = stderr
+
+ var err error
+ defer func() {
+ if err != nil {
+ log.WithError(err).Info("exiting")
+ }
+ }()
+ flags := flag.NewFlagSet("", flag.ContinueOnError)
+ flags.SetOutput(stderr)
+ configFile := flags.String("config", arvados.DefaultConfigFile, "Site configuration `file`")
+ err = flags.Parse(args)
+ if err == flag.ErrHelp {
+ err = nil
+ return 0
+ } else if err != nil {
+ return 2
+ }
+ cfg, err := arvados.GetConfig(*configFile)
+ if err != nil {
+ return 1
+ }
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ return 1
+ }
+ node, err := cluster.GetThisSystemNode()
+ if err != nil {
+ return 1
+ }
+ listen := node.ServicePorts()[c.svcName]
+ if listen == "" {
+ err = fmt.Errorf("configuration does not enable the %s service on this host", c.svcName)
+ return 1
+ }
+ srv := &httpserver.Server{
+ Server: http.Server{
+ Handler: httpserver.AddRequestIDs(httpserver.LogRequests(log, c.newHandler(cluster, node))),
+ },
+ Addr: listen,
+ }
+ err = srv.Start()
+ if err != nil {
+ return 1
+ }
+ log.WithFields(logrus.Fields{
+ "Listen": srv.Addr,
+ "Service": c.svcName,
+ }).Info("listening")
+ err = srv.Wait()
+ if err != nil {
+ return 1
+ }
+ return 0
+}
+
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
Workbench SystemServiceInstance `json:"arvados-workbench"`
}
+type ServiceName string
+
+const (
+ ServiceNameRailsAPI ServiceName = "arvados-api-server"
+ ServiceNameController ServiceName = "arvados-controller"
+ ServiceNameNodemanager ServiceName = "arvados-node-manager"
+ ServiceNameWorkbench ServiceName = "arvados-workbench"
+ ServiceNameWebsocket ServiceName = "arvados-ws"
+ ServiceNameKeepweb ServiceName = "keep-web"
+ ServiceNameKeepproxy ServiceName = "keepproxy"
+ ServiceNameKeepstore ServiceName = "keepstore"
+)
+
// ServicePorts returns the configured listening address (or "" if
// disabled) for each service on the node.
-func (sn *SystemNode) ServicePorts() map[string]string {
- return map[string]string{
- "arvados-api-server": sn.RailsAPI.Listen,
- "arvados-controller": sn.Controller.Listen,
- "arvados-node-manager": sn.Nodemanager.Listen,
- "arvados-workbench": sn.Workbench.Listen,
- "arvados-ws": sn.Websocket.Listen,
- "keep-web": sn.Keepweb.Listen,
- "keepproxy": sn.Keepproxy.Listen,
- "keepstore": sn.Keepstore.Listen,
+func (sn *SystemNode) ServicePorts() map[ServiceName]string {
+ return map[ServiceName]string{
+ ServiceNameRailsAPI: sn.RailsAPI.Listen,
+ ServiceNameController: sn.Controller.Listen,
+ ServiceNameNodemanager: sn.Nodemanager.Listen,
+ ServiceNameWorkbench: sn.Workbench.Listen,
+ ServiceNameWebsocket: sn.Websocket.Listen,
+ ServiceNameKeepweb: sn.Keepweb.Listen,
+ ServiceNameKeepproxy: sn.Keepproxy.Listen,
+ ServiceNameKeepstore: sn.Keepstore.Listen,
}
}
// exposes problems that can't be expressed in Checks, like
// "service S is needed, but isn't configured to run
// anywhere."
- Services map[string]ServiceHealth `json:"services"`
+ Services map[arvados.ServiceName]ServiceHealth `json:"services"`
}
type CheckResult struct {
resp := ClusterHealthResponse{
Health: "OK",
Checks: make(map[string]CheckResult),
- Services: make(map[string]ServiceHealth),
+ Services: make(map[arvados.ServiceName]ServiceHealth),
}
mtx := sync.Mutex{}
}
wg.Add(1)
- go func(node, svc, addr string) {
+ go func(node string, svc arvados.ServiceName, addr string) {
defer wg.Done()
var result CheckResult
url, err := agg.pingURL(node, addr)
mtx.Lock()
defer mtx.Unlock()
- resp.Checks[svc+"+"+url] = result
+ resp.Checks[fmt.Sprintf("%s+%s", svc, url)] = result
if result.Health == "OK" {
h := resp.Services[svc]
h.N++
var requestTimeContextKey = contextKey{"requestTime"}
-var Logger logrus.FieldLogger = logrus.StandardLogger()
-
// LogRequests wraps an http.Handler, logging each request and
-// response via logrus.
-func LogRequests(h http.Handler) http.Handler {
+// response via logger.
+func LogRequests(logger logrus.FieldLogger, h http.Handler) http.Handler {
+ if logger == nil {
+ logger = logrus.StandardLogger()
+ }
return http.HandlerFunc(func(wrapped http.ResponseWriter, req *http.Request) {
w := &responseTimer{ResponseWriter: WrapResponseWriter(wrapped)}
req = req.WithContext(context.WithValue(req.Context(), &requestTimeContextKey, time.Now()))
- lgr := Logger.WithFields(logrus.Fields{
+ lgr := logger.WithFields(logrus.Fields{
"RequestID": req.Header.Get("X-Request-Id"),
"remoteAddr": req.RemoteAddr,
"reqForwardedFor": req.Header.Get("X-Forwarded-For"),
"encoding/json"
"net/http"
"net/http/httptest"
- "os"
"testing"
"time"
- log "github.com/Sirupsen/logrus"
+ "github.com/Sirupsen/logrus"
check "gopkg.in/check.v1"
)
type Suite struct{}
func (s *Suite) TestLogRequests(c *check.C) {
- defer log.SetOutput(os.Stdout)
captured := &bytes.Buffer{}
- log.SetOutput(captured)
- log.SetFormatter(&log.JSONFormatter{
+ log := logrus.New()
+ log.Out = captured
+ log.Formatter = &logrus.JSONFormatter{
TimestampFormat: time.RFC3339Nano,
- })
+ }
+
h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("hello world"))
})
req.Header.Set("X-Forwarded-For", "1.2.3.4:12345")
c.Assert(err, check.IsNil)
resp := httptest.NewRecorder()
- AddRequestIDs(LogRequests(h)).ServeHTTP(resp, req)
+ AddRequestIDs(LogRequests(log, h)).ServeHTTP(resp, req)
dec := json.NewDecoder(captured)
}
func (srv *server) Start() error {
- srv.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(&handler{Config: srv.Config}))
+ srv.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(nil, &handler{Config: srv.Config}))
srv.Addr = srv.Config.Listen
return srv.Server.Start()
}
// Start serving requests.
router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken)
- http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
+ http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(nil, router)))
log.Println("shutting down")
}
mux := http.NewServeMux()
mux.Handle("/", theConfig.metrics.Instrument(
- httpserver.AddRequestIDs(httpserver.LogRequests(rtr.limiter))))
+ httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter))))
mux.HandleFunc("/metrics.json", theConfig.metrics.exportJSON)
mux.Handle("/metrics", theConfig.metrics.exportProm)