Merge branch 'master' into 15531-logincluster-migrate
[arvados.git] / lib / service / cmd.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 // package service provides a cmd.Handler that brings up a system service.
6 package service
7
8 import (
9         "context"
10         "flag"
11         "fmt"
12         "io"
13         "net"
14         "net/http"
15         "os"
16         "strings"
17
18         "git.curoverse.com/arvados.git/lib/cmd"
19         "git.curoverse.com/arvados.git/lib/config"
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
22         "git.curoverse.com/arvados.git/sdk/go/httpserver"
23         "github.com/coreos/go-systemd/daemon"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26 )
27
28 type Handler interface {
29         http.Handler
30         CheckHealth() error
31 }
32
33 type NewHandlerFunc func(_ context.Context, _ *arvados.Cluster, token string, registry *prometheus.Registry) Handler
34
35 type command struct {
36         newHandler NewHandlerFunc
37         svcName    arvados.ServiceName
38         ctx        context.Context // enables tests to shutdown service; no public API yet
39 }
40
41 // Command returns a cmd.Handler that loads site config, calls
42 // newHandler with the current cluster and node configs, and brings up
43 // an http server with the returned handler.
44 //
45 // The handler is wrapped with server middleware (adding X-Request-ID
46 // headers, logging requests/responses, etc).
47 func Command(svcName arvados.ServiceName, newHandler NewHandlerFunc) cmd.Handler {
48         return &command{
49                 newHandler: newHandler,
50                 svcName:    svcName,
51                 ctx:        context.Background(),
52         }
53 }
54
55 func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
56         log := ctxlog.New(stderr, "json", "info")
57
58         var err error
59         defer func() {
60                 if err != nil {
61                         log.WithError(err).Info("exiting")
62                 }
63         }()
64
65         flags := flag.NewFlagSet("", flag.ContinueOnError)
66         flags.SetOutput(stderr)
67
68         loader := config.NewLoader(stdin, log)
69         loader.SetupFlags(flags)
70         versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
71         err = flags.Parse(args)
72         if err == flag.ErrHelp {
73                 err = nil
74                 return 0
75         } else if err != nil {
76                 return 2
77         } else if *versionFlag {
78                 return cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
79         }
80
81         if strings.HasSuffix(prog, "controller") {
82                 // Some config-loader checks try to make API calls via
83                 // controller. Those can't be expected to work if this
84                 // process _is_ the controller: we haven't started an
85                 // http server yet.
86                 loader.SkipAPICalls = true
87         }
88
89         cfg, err := loader.Load()
90         if err != nil {
91                 return 1
92         }
93         cluster, err := cfg.GetCluster("")
94         if err != nil {
95                 return 1
96         }
97
98         // Now that we've read the config, replace the bootstrap
99         // logger with a new one according to the logging config.
100         log = ctxlog.New(stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel)
101         logger := log.WithFields(logrus.Fields{
102                 "PID": os.Getpid(),
103         })
104         ctx := ctxlog.Context(c.ctx, logger)
105
106         listenURL, err := getListenAddr(cluster.Services, c.svcName, log)
107         if err != nil {
108                 return 1
109         }
110         ctx = context.WithValue(ctx, contextKeyURL{}, listenURL)
111
112         reg := prometheus.NewRegistry()
113         handler := c.newHandler(ctx, cluster, cluster.SystemRootToken, reg)
114         if err = handler.CheckHealth(); err != nil {
115                 return 1
116         }
117
118         instrumented := httpserver.Instrument(reg, log,
119                 httpserver.HandlerWithContext(ctx,
120                         httpserver.AddRequestIDs(
121                                 httpserver.LogRequests(
122                                         httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg)))))
123         srv := &httpserver.Server{
124                 Server: http.Server{
125                         Handler: instrumented.ServeAPI(cluster.ManagementToken, instrumented),
126                 },
127                 Addr: listenURL.Host,
128         }
129         if listenURL.Scheme == "https" {
130                 tlsconfig, err := tlsConfigWithCertUpdater(cluster, logger)
131                 if err != nil {
132                         logger.WithError(err).Errorf("cannot start %s service on %s", c.svcName, listenURL.String())
133                         return 1
134                 }
135                 srv.TLSConfig = tlsconfig
136         }
137         err = srv.Start()
138         if err != nil {
139                 return 1
140         }
141         logger.WithFields(logrus.Fields{
142                 "URL":     listenURL,
143                 "Listen":  srv.Addr,
144                 "Service": c.svcName,
145         }).Info("listening")
146         if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
147                 logger.WithError(err).Errorf("error notifying init daemon")
148         }
149         go func() {
150                 <-ctx.Done()
151                 srv.Close()
152         }()
153         err = srv.Wait()
154         if err != nil {
155                 return 1
156         }
157         return 0
158 }
159
160 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
161
162 func getListenAddr(svcs arvados.Services, prog arvados.ServiceName, log logrus.FieldLogger) (arvados.URL, error) {
163         svc, ok := svcs.Map()[prog]
164         if !ok {
165                 return arvados.URL{}, fmt.Errorf("unknown service name %q", prog)
166         }
167         for url := range svc.InternalURLs {
168                 if strings.HasPrefix(url.Host, "localhost:") {
169                         return url, nil
170                 }
171                 listener, err := net.Listen("tcp", url.Host)
172                 if err == nil {
173                         listener.Close()
174                         return url, nil
175                 } else if strings.Contains(err.Error(), "cannot assign requested address") {
176                         continue
177                 } else if strings.Contains(err.Error(), "address already in use") {
178                         return url, err
179                 } else {
180                         log.Warn(err)
181                 }
182         }
183         return arvados.URL{}, fmt.Errorf("configuration does not enable the %s service on this host", prog)
184 }
185
186 type contextKeyURL struct{}
187
188 func URLFromContext(ctx context.Context) (arvados.URL, bool) {
189         u, ok := ctx.Value(contextKeyURL{}).(arvados.URL)
190         return u, ok
191 }