X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8975e5ccaa3d39f611dec459f066181277f03454..64eac5879fe80f9ad52665421962740390a14eee:/lib/boot/supervisor.go diff --git a/lib/boot/supervisor.go b/lib/boot/supervisor.go index de570a9520..3484a1444e 100644 --- a/lib/boot/supervisor.go +++ b/lib/boot/supervisor.go @@ -19,22 +19,24 @@ import ( "os/signal" "os/user" "path/filepath" + "reflect" "strings" "sync" "syscall" "time" + "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/lib/service" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/health" + "github.com/fsnotify/fsnotify" "github.com/sirupsen/logrus" ) type Supervisor struct { SourcePath string // e.g., /home/username/src/arvados SourceVersion string // e.g., acbd1324... - LibPath string // e.g., /var/lib/arvados ClusterType string // e.g., production ListenHost string // e.g., localhost ControllerAddr string // e.g., 127.0.0.1:8000 @@ -46,7 +48,8 @@ type Supervisor struct { ctx context.Context cancel context.CancelFunc - done chan struct{} + done chan struct{} // closed when child procs/services have shut down + err error // error that caused shutdown (valid when done is closed) healthChecker *health.Aggregator tasksReady map[string]chan bool waitShutdown sync.WaitGroup @@ -56,30 +59,66 @@ type Supervisor struct { environ []string // for child processes } -func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config) { +func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config, cfgPath string) { super.ctx, super.cancel = context.WithCancel(ctx) super.done = make(chan struct{}) go func() { + defer close(super.done) + sigch := make(chan os.Signal) signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM) defer signal.Stop(sigch) go func() { for sig := range sigch { super.logger.WithField("signal", sig).Info("caught signal") + if super.err == nil { + super.err = fmt.Errorf("caught signal %s", sig) + } + super.cancel() + } + }() + + hupch := make(chan os.Signal) + signal.Notify(hupch, syscall.SIGHUP) + defer signal.Stop(hupch) + go func() { + for sig := range hupch { + super.logger.WithField("signal", sig).Info("caught signal") + if super.err == nil { + super.err = errNeedConfigReload + } super.cancel() } }() + if cfgPath != "" && cfgPath != "-" && cfg.AutoReloadConfig { + go watchConfig(super.ctx, super.logger, cfgPath, copyConfig(cfg), func() { + if super.err == nil { + super.err = errNeedConfigReload + } + super.cancel() + }) + } + err := super.run(cfg) if err != nil { - fmt.Fprintln(super.Stderr, err) + super.logger.WithError(err).Warn("supervisor shut down") + if super.err == nil { + super.err = err + } } - close(super.done) }() } +func (super *Supervisor) Wait() error { + <-super.done + return super.err +} + func (super *Supervisor) run(cfg *arvados.Config) error { + defer super.cancel() + cwd, err := os.Getwd() if err != nil { return err @@ -97,17 +136,17 @@ func (super *Supervisor) run(cfg *arvados.Config) error { return err } defer os.RemoveAll(super.tempdir) - if err := os.Mkdir(filepath.Join(super.tempdir, "bin"), 0777); err != nil { + if err := os.Mkdir(filepath.Join(super.tempdir, "bin"), 0755); err != nil { return err } // Fill in any missing config keys, and write the resulting // config in the temp dir for child services to use. - err = super.autofillConfig(cfg, super.logger) + err = super.autofillConfig(cfg) if err != nil { return err } - conffile, err := os.OpenFile(filepath.Join(super.tempdir, "config.yml"), os.O_CREATE|os.O_WRONLY, 0777) + conffile, err := os.OpenFile(filepath.Join(super.tempdir, "config.yml"), os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } @@ -123,12 +162,11 @@ func (super *Supervisor) run(cfg *arvados.Config) error { super.configfile = conffile.Name() super.environ = os.Environ() - super.cleanEnv() + super.cleanEnv([]string{"ARVADOS_"}) super.setEnv("ARVADOS_CONFIG", super.configfile) super.setEnv("RAILS_ENV", super.ClusterType) super.setEnv("TMPDIR", super.tempdir) - super.prependEnv("PATH", filepath.Join(super.tempdir, "bin")+":") - super.prependEnv("PATH", filepath.Join(super.LibPath, "bin")+":") + super.prependEnv("PATH", super.tempdir+"/bin:/var/lib/arvados/bin:") super.cluster, err = cfg.GetCluster("") if err != nil { @@ -164,16 +202,7 @@ func (super *Supervisor) run(cfg *arvados.Config) error { } else { return errors.New("specifying a version to run is not yet supported") } - for _, dir := range []string{super.LibPath, filepath.Join(super.LibPath, "bin")} { - if _, err = os.Stat(filepath.Join(dir, ".")); os.IsNotExist(err) { - err = os.Mkdir(dir, 0755) - if err != nil { - return err - } - } else if err != nil { - return err - } - } + _, err = super.installGoProgram(super.ctx, "cmd/arvados-server") if err != nil { return err @@ -193,7 +222,7 @@ func (super *Supervisor) run(cfg *arvados.Config) error { runGoProgram{src: "services/keepproxy", svc: super.cluster.Services.Keepproxy, depends: []supervisedTask{runPassenger{src: "services/api"}}}, runGoProgram{src: "services/keepstore", svc: super.cluster.Services.Keepstore}, runGoProgram{src: "services/keep-web", svc: super.cluster.Services.WebDAV}, - runGoProgram{src: "services/ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{runPostgreSQL{}}}, + runServiceCommand{name: "ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{runPostgreSQL{}}}, installPassenger{src: "services/api"}, runPassenger{src: "services/api", svc: super.cluster.Services.RailsAPI, depends: []supervisedTask{createCertificates{}, runPostgreSQL{}, installPassenger{src: "services/api"}}}, installPassenger{src: "apps/workbench", depends: []supervisedTask{installPassenger{src: "services/api"}}}, // dependency ensures workbench doesn't delay api startup @@ -307,17 +336,11 @@ func (super *Supervisor) prependEnv(key, prepend string) { super.environ = append(super.environ, key+"="+prepend) } -var cleanEnvPrefixes = []string{ - "GEM_HOME=", - "GEM_PATH=", - "ARVADOS_", -} - -func (super *Supervisor) cleanEnv() { +func (super *Supervisor) cleanEnv(prefixes []string) { var cleaned []string for _, s := range super.environ { drop := false - for _, p := range cleanEnvPrefixes { + for _, p := range prefixes { if strings.HasPrefix(s, p) { drop = true break @@ -365,17 +388,33 @@ func (super *Supervisor) installGoProgram(ctx context.Context, srcpath string) ( return binfile, err } +func (super *Supervisor) usingRVM() bool { + return os.Getenv("rvm_path") != "" +} + func (super *Supervisor) setupRubyEnv() error { - cmd := exec.Command("gem", "env", "gempath") - cmd.Env = super.environ - buf, err := cmd.Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:... - if err != nil || len(buf) == 0 { - return fmt.Errorf("gem env gempath: %v", err) - } - gempath := string(bytes.Split(buf, []byte{':'})[0]) - super.prependEnv("PATH", gempath+"/bin:") - super.setEnv("GEM_HOME", gempath) - super.setEnv("GEM_PATH", gempath) + if !super.usingRVM() { + // (If rvm is in use, assume the caller has everything + // set up as desired) + super.cleanEnv([]string{ + "GEM_HOME=", + "GEM_PATH=", + }) + gem := "gem" + if _, err := os.Stat("/var/lib/arvados/bin/gem"); err == nil { + gem = "/var/lib/arvados/bin/gem" + } + cmd := exec.Command(gem, "env", "gempath") + cmd.Env = super.environ + buf, err := cmd.Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:... + if err != nil || len(buf) == 0 { + return fmt.Errorf("gem env gempath: %v", err) + } + gempath := string(bytes.Split(buf, []byte{':'})[0]) + super.prependEnv("PATH", gempath+"/bin:") + super.setEnv("GEM_HOME", gempath) + super.setEnv("GEM_PATH", gempath) + } // Passenger install doesn't work unless $HOME is ~user u, err := user.Current() if err != nil { @@ -411,7 +450,11 @@ func (super *Supervisor) RunProgram(ctx context.Context, dir string, output io.W cmdline := fmt.Sprintf("%s", append([]string{prog}, args...)) super.logger.WithField("command", cmdline).WithField("dir", dir).Info("executing") - logprefix := strings.TrimPrefix(prog, super.tempdir+"/bin/") + logprefix := prog + if logprefix == "setuidgid" && len(args) >= 3 { + logprefix = args[2] + } + logprefix = strings.TrimPrefix(logprefix, super.tempdir+"/bin/") if logprefix == "bundle" && len(args) > 2 && args[0] == "exec" { logprefix = args[1] } else if logprefix == "arvados-server" && len(args) > 1 { @@ -495,15 +538,15 @@ func (super *Supervisor) RunProgram(ctx context.Context, dir string, output io.W return nil } -func (super *Supervisor) autofillConfig(cfg *arvados.Config, log logrus.FieldLogger) error { +func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { cluster, err := cfg.GetCluster("") if err != nil { return err } usedPort := map[string]bool{} - nextPort := func() string { + nextPort := func(host string) string { for { - port, err := availablePort(super.ListenHost) + port, err := availablePort(host) if err != nil { panic(err) } @@ -523,13 +566,9 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config, log logrus.FieldLog h = super.ListenHost } if p == "0" { - p, err = availablePort(h) - if err != nil { - return err - } - usedPort[p] = true + p = nextPort(h) } - cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p)} + cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p), Path: "/"} } for _, svc := range []*arvados.Service{ &cluster.Services.Controller, @@ -547,18 +586,22 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config, log logrus.FieldLog if svc == &cluster.Services.DispatchCloud && super.ClusterType == "test" { continue } - if svc.ExternalURL.Host == "" && (svc == &cluster.Services.Controller || - svc == &cluster.Services.GitHTTP || - svc == &cluster.Services.Keepproxy || - svc == &cluster.Services.WebDAV || - svc == &cluster.Services.WebDAVDownload || - svc == &cluster.Services.Websocket || - svc == &cluster.Services.Workbench1) { - svc.ExternalURL = arvados.URL{Scheme: "https", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort())} + if svc.ExternalURL.Host == "" { + if svc == &cluster.Services.Controller || + svc == &cluster.Services.GitHTTP || + svc == &cluster.Services.Health || + svc == &cluster.Services.Keepproxy || + svc == &cluster.Services.WebDAV || + svc == &cluster.Services.WebDAVDownload || + svc == &cluster.Services.Workbench1 { + svc.ExternalURL = arvados.URL{Scheme: "https", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"} + } else if svc == &cluster.Services.Websocket { + svc.ExternalURL = arvados.URL{Scheme: "wss", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/websocket"} + } } if len(svc.InternalURLs) == 0 { svc.InternalURLs = map[arvados.URL]arvados.ServiceInstance{ - arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort())}: arvados.ServiceInstance{}, + {Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}: {}, } } } @@ -574,6 +617,10 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config, log logrus.FieldLog if cluster.Collections.BlobSigningKey == "" { cluster.Collections.BlobSigningKey = randomHexString(64) } + if cluster.Users.AnonymousUserToken == "" { + cluster.Users.AnonymousUserToken = randomHexString(64) + } + if super.ClusterType != "production" && cluster.Containers.DispatchPrivateKey == "" { buf, err := ioutil.ReadFile(filepath.Join(super.SourcePath, "lib", "dispatchcloud", "test", "sshkey_dispatch")) if err != nil { @@ -586,7 +633,7 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config, log logrus.FieldLog } if super.ClusterType == "test" { // Add a second keepstore process. - cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort())}] = arvados.ServiceInstance{} + cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}] = arvados.ServiceInstance{} // Create a directory-backed volume for each keepstore // process. @@ -597,7 +644,7 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config, log logrus.FieldLog if _, err = os.Stat(datadir + "/."); err == nil { } else if !os.IsNotExist(err) { return err - } else if err = os.Mkdir(datadir, 0777); err != nil { + } else if err = os.Mkdir(datadir, 0755); err != nil { return err } cluster.Volumes[fmt.Sprintf(cluster.ClusterID+"-nyw5e-%015d", volnum)] = arvados.Volume{ @@ -613,7 +660,7 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config, log logrus.FieldLog cluster.PostgreSQL.Connection = arvados.PostgreSQLConnection{ "client_encoding": "utf8", "host": "localhost", - "port": nextPort(), + "port": nextPort(super.ListenHost), "dbname": "arvados_test", "user": "arvados", "password": "insecure_arvados_test", @@ -624,6 +671,19 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config, log logrus.FieldLog return nil } +func addrIsLocal(addr string) (bool, error) { + return true, nil + listener, err := net.Listen("tcp", addr) + if err == nil { + listener.Close() + return true, nil + } else if strings.Contains(err.Error(), "cannot assign requested address") { + return false, nil + } else { + return false, err + } +} + func randomHexString(chars int) string { b := make([]byte, chars/2) _, err := rand.Read(b) @@ -634,6 +694,9 @@ func randomHexString(chars int) string { } func internalPort(svc arvados.Service) (string, error) { + if len(svc.InternalURLs) > 1 { + return "", errors.New("internalPort() doesn't work with multiple InternalURLs") + } for u := range svc.InternalURLs { if _, p, err := net.SplitHostPort(u.Host); err != nil { return "", err @@ -688,3 +751,67 @@ func waitForConnect(ctx context.Context, addr string) error { } return ctx.Err() } + +func copyConfig(cfg *arvados.Config) *arvados.Config { + pr, pw := io.Pipe() + go func() { + err := json.NewEncoder(pw).Encode(cfg) + if err != nil { + panic(err) + } + pw.Close() + }() + cfg2 := new(arvados.Config) + err := json.NewDecoder(pr).Decode(cfg2) + if err != nil { + panic(err) + } + return cfg2 +} + +func watchConfig(ctx context.Context, logger logrus.FieldLogger, cfgPath string, prevcfg *arvados.Config, fn func()) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + logger.WithError(err).Error("fsnotify setup failed") + return + } + defer watcher.Close() + + err = watcher.Add(cfgPath) + if err != nil { + logger.WithError(err).Error("fsnotify watcher failed") + return + } + + for { + select { + case <-ctx.Done(): + return + case err, ok := <-watcher.Errors: + if !ok { + return + } + logger.WithError(err).Warn("fsnotify watcher reported error") + case _, ok := <-watcher.Events: + if !ok { + return + } + for len(watcher.Events) > 0 { + <-watcher.Events + } + loader := config.NewLoader(&bytes.Buffer{}, &logrus.Logger{Out: ioutil.Discard}) + loader.Path = cfgPath + loader.SkipAPICalls = true + cfg, err := loader.Load() + if err != nil { + logger.WithError(err).Warn("error reloading config file after change detected; ignoring new config for now") + } else if reflect.DeepEqual(cfg, prevcfg) { + logger.Debug("config file changed but is still DeepEqual to the existing config") + } else { + logger.Debug("config changed, notifying supervisor") + fn() + prevcfg = cfg + } + } + } +}