X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c9fe930b422bd1675af3adb324ede3b5aa28888c..80a90301263f46ebb7b26297093763882f2cf582:/lib/boot/supervisor.go diff --git a/lib/boot/supervisor.go b/lib/boot/supervisor.go index 7f5d6a9baa..0e4472b8a0 100644 --- a/lib/boot/supervisor.go +++ b/lib/boot/supervisor.go @@ -19,15 +19,18 @@ import ( "os/signal" "os/user" "path/filepath" + "reflect" "strings" "sync" "syscall" "time" + "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/lib/service" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/health" + "github.com/fsnotify/fsnotify" "github.com/sirupsen/logrus" ) @@ -45,7 +48,8 @@ type Supervisor struct { ctx context.Context cancel context.CancelFunc - done chan struct{} + done chan struct{} // closed when child procs/services have shut down + err error // error that caused shutdown (valid when done is closed) healthChecker *health.Aggregator tasksReady map[string]chan bool waitShutdown sync.WaitGroup @@ -55,30 +59,66 @@ type Supervisor struct { environ []string // for child processes } -func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config) { +func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config, cfgPath string) { super.ctx, super.cancel = context.WithCancel(ctx) super.done = make(chan struct{}) go func() { + defer close(super.done) + sigch := make(chan os.Signal) signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM) defer signal.Stop(sigch) go func() { for sig := range sigch { super.logger.WithField("signal", sig).Info("caught signal") + if super.err == nil { + super.err = fmt.Errorf("caught signal %s", sig) + } super.cancel() } }() + hupch := make(chan os.Signal) + signal.Notify(hupch, syscall.SIGHUP) + defer signal.Stop(hupch) + go func() { + for sig := range hupch { + super.logger.WithField("signal", sig).Info("caught signal") + if super.err == nil { + super.err = errNeedConfigReload + } + super.cancel() + } + }() + + if cfgPath != "" && cfgPath != "-" && cfg.AutoReloadConfig { + go watchConfig(super.ctx, super.logger, cfgPath, copyConfig(cfg), func() { + if super.err == nil { + super.err = errNeedConfigReload + } + super.cancel() + }) + } + err := super.run(cfg) if err != nil { super.logger.WithError(err).Warn("supervisor shut down") + if super.err == nil { + super.err = err + } } - close(super.done) }() } +func (super *Supervisor) Wait() error { + <-super.done + return super.err +} + func (super *Supervisor) run(cfg *arvados.Config) error { + defer super.cancel() + cwd, err := os.Getwd() if err != nil { return err @@ -398,9 +438,9 @@ func (super *Supervisor) lookPath(prog string) string { return prog } -// Run prog with args, using dir as working directory. If ctx is -// cancelled while the child is running, RunProgram terminates the -// child, waits for it to exit, then returns. +// RunProgram runs prog with args, using dir as working directory. If ctx is +// cancelled while the child is running, RunProgram terminates the child, waits +// for it to exit, then returns. // // Child's environment will have our env vars, plus any given in env. // @@ -528,7 +568,7 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { if p == "0" { p = nextPort(h) } - cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p)} + cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p), Path: "/"} } for _, svc := range []*arvados.Service{ &cluster.Services.Controller, @@ -549,18 +589,19 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { if svc.ExternalURL.Host == "" { if svc == &cluster.Services.Controller || svc == &cluster.Services.GitHTTP || + svc == &cluster.Services.Health || svc == &cluster.Services.Keepproxy || svc == &cluster.Services.WebDAV || svc == &cluster.Services.WebDAVDownload || svc == &cluster.Services.Workbench1 { - svc.ExternalURL = arvados.URL{Scheme: "https", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost))} + svc.ExternalURL = arvados.URL{Scheme: "https", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"} } else if svc == &cluster.Services.Websocket { - svc.ExternalURL = arvados.URL{Scheme: "wss", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost))} + svc.ExternalURL = arvados.URL{Scheme: "wss", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/websocket"} } } if len(svc.InternalURLs) == 0 { svc.InternalURLs = map[arvados.URL]arvados.ServiceInstance{ - arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost))}: arvados.ServiceInstance{}, + {Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}: {}, } } } @@ -570,12 +611,13 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { if cluster.ManagementToken == "" { cluster.ManagementToken = randomHexString(64) } - if cluster.API.RailsSessionSecretToken == "" { - cluster.API.RailsSessionSecretToken = randomHexString(64) - } if cluster.Collections.BlobSigningKey == "" { cluster.Collections.BlobSigningKey = randomHexString(64) } + if cluster.Users.AnonymousUserToken == "" { + cluster.Users.AnonymousUserToken = randomHexString(64) + } + if super.ClusterType != "production" && cluster.Containers.DispatchPrivateKey == "" { buf, err := ioutil.ReadFile(filepath.Join(super.SourcePath, "lib", "dispatchcloud", "test", "sshkey_dispatch")) if err != nil { @@ -588,7 +630,7 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { } if super.ClusterType == "test" { // Add a second keepstore process. - cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost))}] = arvados.ServiceInstance{} + cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}] = arvados.ServiceInstance{} // Create a directory-backed volume for each keepstore // process. @@ -706,3 +748,67 @@ func waitForConnect(ctx context.Context, addr string) error { } return ctx.Err() } + +func copyConfig(cfg *arvados.Config) *arvados.Config { + pr, pw := io.Pipe() + go func() { + err := json.NewEncoder(pw).Encode(cfg) + if err != nil { + panic(err) + } + pw.Close() + }() + cfg2 := new(arvados.Config) + err := json.NewDecoder(pr).Decode(cfg2) + if err != nil { + panic(err) + } + return cfg2 +} + +func watchConfig(ctx context.Context, logger logrus.FieldLogger, cfgPath string, prevcfg *arvados.Config, fn func()) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + logger.WithError(err).Error("fsnotify setup failed") + return + } + defer watcher.Close() + + err = watcher.Add(cfgPath) + if err != nil { + logger.WithError(err).Error("fsnotify watcher failed") + return + } + + for { + select { + case <-ctx.Done(): + return + case err, ok := <-watcher.Errors: + if !ok { + return + } + logger.WithError(err).Warn("fsnotify watcher reported error") + case _, ok := <-watcher.Events: + if !ok { + return + } + for len(watcher.Events) > 0 { + <-watcher.Events + } + loader := config.NewLoader(&bytes.Buffer{}, &logrus.Logger{Out: ioutil.Discard}) + loader.Path = cfgPath + loader.SkipAPICalls = true + cfg, err := loader.Load() + if err != nil { + logger.WithError(err).Warn("error reloading config file after change detected; ignoring new config for now") + } else if reflect.DeepEqual(cfg, prevcfg) { + logger.Debug("config file changed but is still DeepEqual to the existing config") + } else { + logger.Debug("config changed, notifying supervisor") + fn() + prevcfg = cfg + } + } + } +}