X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f1c8e02eea0adcf49cba0117482a10b09d778724..3aaefcb3c76ff470b475d950398d01255e87712a:/lib/boot/supervisor.go diff --git a/lib/boot/supervisor.go b/lib/boot/supervisor.go index 4a4ab4d98b..20576b6b97 100644 --- a/lib/boot/supervisor.go +++ b/lib/boot/supervisor.go @@ -14,20 +14,24 @@ import ( "io" "io/ioutil" "net" + "net/url" "os" "os/exec" "os/signal" "os/user" "path/filepath" + "reflect" "strings" "sync" "syscall" "time" + "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/lib/service" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/health" + "github.com/fsnotify/fsnotify" "github.com/sirupsen/logrus" ) @@ -45,40 +49,79 @@ type Supervisor struct { ctx context.Context cancel context.CancelFunc - done chan struct{} + done chan struct{} // closed when child procs/services have shut down + err error // error that caused shutdown (valid when done is closed) healthChecker *health.Aggregator tasksReady map[string]chan bool waitShutdown sync.WaitGroup + bindir string tempdir string + wwwtempdir string configfile string environ []string // for child processes } -func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config) { +func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config, cfgPath string) { super.ctx, super.cancel = context.WithCancel(ctx) super.done = make(chan struct{}) go func() { + defer close(super.done) + sigch := make(chan os.Signal) signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM) defer signal.Stop(sigch) go func() { for sig := range sigch { super.logger.WithField("signal", sig).Info("caught signal") + if super.err == nil { + super.err = fmt.Errorf("caught signal %s", sig) + } super.cancel() } }() + hupch := make(chan os.Signal) + signal.Notify(hupch, syscall.SIGHUP) + defer signal.Stop(hupch) + go func() { + for sig := range hupch { + super.logger.WithField("signal", sig).Info("caught signal") + if super.err == nil { + super.err = errNeedConfigReload + } + super.cancel() + } + }() + + if cfgPath != "" && cfgPath != "-" && cfg.AutoReloadConfig { + go watchConfig(super.ctx, super.logger, cfgPath, copyConfig(cfg), func() { + if super.err == nil { + super.err = errNeedConfigReload + } + super.cancel() + }) + } + err := super.run(cfg) if err != nil { super.logger.WithError(err).Warn("supervisor shut down") + if super.err == nil { + super.err = err + } } - close(super.done) }() } +func (super *Supervisor) Wait() error { + <-super.done + return super.err +} + func (super *Supervisor) run(cfg *arvados.Config) error { + defer super.cancel() + cwd, err := os.Getwd() if err != nil { return err @@ -91,13 +134,26 @@ func (super *Supervisor) run(cfg *arvados.Config) error { return err } - super.tempdir, err = ioutil.TempDir("", "arvados-server-boot-") - if err != nil { - return err - } - defer os.RemoveAll(super.tempdir) - if err := os.Mkdir(filepath.Join(super.tempdir, "bin"), 0755); err != nil { - return err + // Choose bin and temp dirs: /var/lib/arvados/... in + // production, transient tempdir otherwise. + if super.ClusterType == "production" { + // These dirs have already been created by + // "arvados-server install" (or by extracting a + // package). + super.tempdir = "/var/lib/arvados/tmp" + super.wwwtempdir = "/var/lib/arvados/wwwtmp" + super.bindir = "/var/lib/arvados/bin" + } else { + super.tempdir, err = ioutil.TempDir("", "arvados-server-boot-") + if err != nil { + return err + } + defer os.RemoveAll(super.tempdir) + super.wwwtempdir = super.tempdir + super.bindir = filepath.Join(super.tempdir, "bin") + if err := os.Mkdir(super.bindir, 0755); err != nil { + return err + } } // Fill in any missing config keys, and write the resulting @@ -126,7 +182,10 @@ func (super *Supervisor) run(cfg *arvados.Config) error { super.setEnv("ARVADOS_CONFIG", super.configfile) super.setEnv("RAILS_ENV", super.ClusterType) super.setEnv("TMPDIR", super.tempdir) - super.prependEnv("PATH", filepath.Join(super.tempdir, "bin")+":") + super.prependEnv("PATH", "/var/lib/arvados/bin:") + if super.ClusterType != "production" { + super.prependEnv("PATH", super.tempdir+"/bin:") + } super.cluster, err = cfg.GetCluster("") if err != nil { @@ -142,7 +201,9 @@ func (super *Supervisor) run(cfg *arvados.Config) error { "PID": os.Getpid(), }) - if super.SourceVersion == "" { + if super.SourceVersion == "" && super.ClusterType == "production" { + // don't need SourceVersion + } else if super.SourceVersion == "" { // Find current source tree version. var buf bytes.Buffer err = super.RunProgram(super.ctx, ".", &buf, nil, "git", "diff", "--shortstat") @@ -182,17 +243,17 @@ func (super *Supervisor) run(cfg *arvados.Config) error { runGoProgram{src: "services/keepproxy", svc: super.cluster.Services.Keepproxy, depends: []supervisedTask{runPassenger{src: "services/api"}}}, runGoProgram{src: "services/keepstore", svc: super.cluster.Services.Keepstore}, runGoProgram{src: "services/keep-web", svc: super.cluster.Services.WebDAV}, - runGoProgram{src: "services/ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{runPostgreSQL{}}}, + runServiceCommand{name: "ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{runPostgreSQL{}}}, installPassenger{src: "services/api"}, - runPassenger{src: "services/api", svc: super.cluster.Services.RailsAPI, depends: []supervisedTask{createCertificates{}, runPostgreSQL{}, installPassenger{src: "services/api"}}}, + runPassenger{src: "services/api", varlibdir: "railsapi", svc: super.cluster.Services.RailsAPI, depends: []supervisedTask{createCertificates{}, runPostgreSQL{}, installPassenger{src: "services/api"}}}, installPassenger{src: "apps/workbench", depends: []supervisedTask{installPassenger{src: "services/api"}}}, // dependency ensures workbench doesn't delay api startup - runPassenger{src: "apps/workbench", svc: super.cluster.Services.Workbench1, depends: []supervisedTask{installPassenger{src: "apps/workbench"}}}, + runPassenger{src: "apps/workbench", varlibdir: "workbench1", svc: super.cluster.Services.Workbench1, depends: []supervisedTask{installPassenger{src: "apps/workbench"}}}, seedDatabase{}, } if super.ClusterType != "test" { tasks = append(tasks, - runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.Controller}, - runGoProgram{src: "services/keep-balance"}, + runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.DispatchCloud}, + runGoProgram{src: "services/keep-balance", svc: super.cluster.Services.Keepbalance}, ) } super.tasksReady = map[string]chan bool{} @@ -342,9 +403,11 @@ func dedupEnv(in []string) []string { func (super *Supervisor) installGoProgram(ctx context.Context, srcpath string) (string, error) { _, basename := filepath.Split(srcpath) - bindir := filepath.Join(super.tempdir, "bin") - binfile := filepath.Join(bindir, basename) - err := super.RunProgram(ctx, filepath.Join(super.SourcePath, srcpath), nil, []string{"GOBIN=" + bindir}, "go", "install", "-ldflags", "-X git.arvados.org/arvados.git/lib/cmd.version="+super.SourceVersion+" -X main.version="+super.SourceVersion) + binfile := filepath.Join(super.bindir, basename) + if super.ClusterType == "production" { + return binfile, nil + } + err := super.RunProgram(ctx, filepath.Join(super.SourcePath, srcpath), nil, []string{"GOBIN=" + super.bindir}, "go", "install", "-ldflags", "-X git.arvados.org/arvados.git/lib/cmd.version="+super.SourceVersion+" -X main.version="+super.SourceVersion) return binfile, err } @@ -360,7 +423,20 @@ func (super *Supervisor) setupRubyEnv() error { "GEM_HOME=", "GEM_PATH=", }) - cmd := exec.Command("gem", "env", "gempath") + gem := "gem" + if _, err := os.Stat("/var/lib/arvados/bin/gem"); err == nil || super.ClusterType == "production" { + gem = "/var/lib/arvados/bin/gem" + } + cmd := exec.Command(gem, "env", "gempath") + if super.ClusterType == "production" { + cmd.Args = append([]string{"sudo", "-u", "www-data", "-E", "HOME=/var/www"}, cmd.Args...) + path, err := exec.LookPath("sudo") + if err != nil { + return fmt.Errorf("LookPath(\"sudo\"): %w", err) + } + cmd.Path = path + } + cmd.Stderr = super.Stderr cmd.Env = super.environ buf, err := cmd.Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:... if err != nil || len(buf) == 0 { @@ -394,9 +470,9 @@ func (super *Supervisor) lookPath(prog string) string { return prog } -// Run prog with args, using dir as working directory. If ctx is -// cancelled while the child is running, RunProgram terminates the -// child, waits for it to exit, then returns. +// RunProgram runs prog with args, using dir as working directory. If ctx is +// cancelled while the child is running, RunProgram terminates the child, waits +// for it to exit, then returns. // // Child's environment will have our env vars, plus any given in env. // @@ -406,14 +482,35 @@ func (super *Supervisor) RunProgram(ctx context.Context, dir string, output io.W cmdline := fmt.Sprintf("%s", append([]string{prog}, args...)) super.logger.WithField("command", cmdline).WithField("dir", dir).Info("executing") - logprefix := strings.TrimPrefix(prog, super.tempdir+"/bin/") - if logprefix == "bundle" && len(args) > 2 && args[0] == "exec" { - logprefix = args[1] - } else if logprefix == "arvados-server" && len(args) > 1 { - logprefix = args[0] - } - if !strings.HasPrefix(dir, "/") { - logprefix = dir + ": " + logprefix + logprefix := prog + { + if logprefix == "setuidgid" && len(args) >= 3 { + logprefix = args[2] + } + innerargs := args + if logprefix == "sudo" { + for i := 0; i < len(args); i++ { + if args[i] == "-u" { + i++ + } else if args[i] == "-E" || strings.Contains(args[i], "=") { + } else { + logprefix = args[i] + innerargs = args[i+1:] + break + } + } + } + logprefix = strings.TrimPrefix(logprefix, "/var/lib/arvados/bin/") + logprefix = strings.TrimPrefix(logprefix, super.tempdir+"/bin/") + if logprefix == "bundle" && len(innerargs) > 2 && innerargs[0] == "exec" { + _, dirbase := filepath.Split(dir) + logprefix = innerargs[1] + "@" + dirbase + } else if logprefix == "arvados-server" && len(args) > 1 { + logprefix = args[0] + } + if !strings.HasPrefix(dir, "/") { + logprefix = dir + ": " + logprefix + } } cmd := exec.Command(super.lookPath(prog), args...) @@ -520,7 +617,7 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { if p == "0" { p = nextPort(h) } - cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p)} + cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p), Path: "/"} } for _, svc := range []*arvados.Service{ &cluster.Services.Controller, @@ -538,46 +635,50 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { if svc == &cluster.Services.DispatchCloud && super.ClusterType == "test" { continue } - if svc.ExternalURL.Host == "" && (svc == &cluster.Services.Controller || - svc == &cluster.Services.GitHTTP || - svc == &cluster.Services.Keepproxy || - svc == &cluster.Services.WebDAV || - svc == &cluster.Services.WebDAVDownload || - svc == &cluster.Services.Websocket || - svc == &cluster.Services.Workbench1) { - svc.ExternalURL = arvados.URL{Scheme: "https", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost))} + if svc.ExternalURL.Host == "" { + if svc == &cluster.Services.Controller || + svc == &cluster.Services.GitHTTP || + svc == &cluster.Services.Health || + svc == &cluster.Services.Keepproxy || + svc == &cluster.Services.WebDAV || + svc == &cluster.Services.WebDAVDownload || + svc == &cluster.Services.Workbench1 { + svc.ExternalURL = arvados.URL{Scheme: "https", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"} + } else if svc == &cluster.Services.Websocket { + svc.ExternalURL = arvados.URL{Scheme: "wss", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/websocket"} + } } if len(svc.InternalURLs) == 0 { svc.InternalURLs = map[arvados.URL]arvados.ServiceInstance{ - arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost))}: arvados.ServiceInstance{}, + {Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}: {}, } } } - if cluster.SystemRootToken == "" { - cluster.SystemRootToken = randomHexString(64) - } - if cluster.ManagementToken == "" { - cluster.ManagementToken = randomHexString(64) - } - if cluster.API.RailsSessionSecretToken == "" { - cluster.API.RailsSessionSecretToken = randomHexString(64) - } - if cluster.Collections.BlobSigningKey == "" { - cluster.Collections.BlobSigningKey = randomHexString(64) - } - if super.ClusterType != "production" && cluster.Containers.DispatchPrivateKey == "" { - buf, err := ioutil.ReadFile(filepath.Join(super.SourcePath, "lib", "dispatchcloud", "test", "sshkey_dispatch")) - if err != nil { - return err - } - cluster.Containers.DispatchPrivateKey = string(buf) - } if super.ClusterType != "production" { + if cluster.SystemRootToken == "" { + cluster.SystemRootToken = randomHexString(64) + } + if cluster.ManagementToken == "" { + cluster.ManagementToken = randomHexString(64) + } + if cluster.API.RailsSessionSecretToken == "" { + cluster.API.RailsSessionSecretToken = randomHexString(64) + } + if cluster.Collections.BlobSigningKey == "" { + cluster.Collections.BlobSigningKey = randomHexString(64) + } + if cluster.Containers.DispatchPrivateKey == "" { + buf, err := ioutil.ReadFile(filepath.Join(super.SourcePath, "lib", "dispatchcloud", "test", "sshkey_dispatch")) + if err != nil { + return err + } + cluster.Containers.DispatchPrivateKey = string(buf) + } cluster.TLS.Insecure = true } if super.ClusterType == "test" { // Add a second keepstore process. - cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost))}] = arvados.ServiceInstance{} + cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}] = arvados.ServiceInstance{} // Create a directory-backed volume for each keepstore // process. @@ -642,11 +743,10 @@ func internalPort(svc arvados.Service) (string, error) { return "", errors.New("internalPort() doesn't work with multiple InternalURLs") } for u := range svc.InternalURLs { - if _, p, err := net.SplitHostPort(u.Host); err != nil { - return "", err - } else if p != "" { + u := url.URL(u) + if p := u.Port(); p != "" { return p, nil - } else if u.Scheme == "https" { + } else if u.Scheme == "https" || u.Scheme == "ws" { return "443", nil } else { return "80", nil @@ -656,11 +756,10 @@ func internalPort(svc arvados.Service) (string, error) { } func externalPort(svc arvados.Service) (string, error) { - if _, p, err := net.SplitHostPort(svc.ExternalURL.Host); err != nil { - return "", err - } else if p != "" { + u := url.URL(svc.ExternalURL) + if p := u.Port(); p != "" { return p, nil - } else if svc.ExternalURL.Scheme == "https" { + } else if u.Scheme == "https" || u.Scheme == "wss" { return "443", nil } else { return "80", nil @@ -695,3 +794,67 @@ func waitForConnect(ctx context.Context, addr string) error { } return ctx.Err() } + +func copyConfig(cfg *arvados.Config) *arvados.Config { + pr, pw := io.Pipe() + go func() { + err := json.NewEncoder(pw).Encode(cfg) + if err != nil { + panic(err) + } + pw.Close() + }() + cfg2 := new(arvados.Config) + err := json.NewDecoder(pr).Decode(cfg2) + if err != nil { + panic(err) + } + return cfg2 +} + +func watchConfig(ctx context.Context, logger logrus.FieldLogger, cfgPath string, prevcfg *arvados.Config, fn func()) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + logger.WithError(err).Error("fsnotify setup failed") + return + } + defer watcher.Close() + + err = watcher.Add(cfgPath) + if err != nil { + logger.WithError(err).Error("fsnotify watcher failed") + return + } + + for { + select { + case <-ctx.Done(): + return + case err, ok := <-watcher.Errors: + if !ok { + return + } + logger.WithError(err).Warn("fsnotify watcher reported error") + case _, ok := <-watcher.Events: + if !ok { + return + } + for len(watcher.Events) > 0 { + <-watcher.Events + } + loader := config.NewLoader(&bytes.Buffer{}, &logrus.Logger{Out: ioutil.Discard}) + loader.Path = cfgPath + loader.SkipAPICalls = true + cfg, err := loader.Load() + if err != nil { + logger.WithError(err).Warn("error reloading config file after change detected; ignoring new config for now") + } else if reflect.DeepEqual(cfg, prevcfg) { + logger.Debug("config file changed but is still DeepEqual to the existing config") + } else { + logger.Debug("config changed, notifying supervisor") + fn() + prevcfg = cfg + } + } + } +}