15954: Don't return from Stop() until child processes end.
[arvados.git] / lib / boot / cmd.go
index 04f6838b75e352afa8e4d0d8f4bbb2cdd0b87fa9..5f5bb1ee77b8c5fc8c209f447d6e44acddaab3a0 100644 (file)
@@ -17,6 +17,7 @@ import (
        "os"
        "os/exec"
        "os/signal"
+       "os/user"
        "path/filepath"
        "strings"
        "sync"
@@ -55,15 +56,6 @@ func (bootCommand) RunCommand(prog string, args []string, stdin io.Reader, stdou
        ctx, cancel := context.WithCancel(ctx)
        defer cancel()
 
-       ch := make(chan os.Signal)
-       signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
-       go func() {
-               for sig := range ch {
-                       boot.logger.WithField("signal", sig).Info("caught signal")
-                       cancel()
-               }
-       }()
-
        var err error
        defer func() {
                if err != nil {
@@ -105,7 +97,8 @@ func (bootCommand) RunCommand(prog string, args []string, stdin io.Reader, stdou
        defer boot.Stop()
        if url, ok := boot.WaitReady(); ok {
                fmt.Fprintln(stdout, url)
-               <-ctx.Done() // wait for signal
+               // Wait for signal/crash + orderly shutdown
+               <-boot.done
                return 0
        } else {
                return 1
@@ -129,6 +122,7 @@ type Booter struct {
        done          chan struct{}
        healthChecker *health.Aggregator
        tasksReady    map[string]chan bool
+       waitShutdown  sync.WaitGroup
 
        tempdir    string
        configfile string
@@ -142,7 +136,18 @@ type Booter struct {
 func (boot *Booter) Start(ctx context.Context, cfg *arvados.Config) {
        boot.ctx, boot.cancel = context.WithCancel(ctx)
        boot.done = make(chan struct{})
+
        go func() {
+               sigch := make(chan os.Signal)
+               signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
+               defer signal.Stop(sigch)
+               go func() {
+                       for sig := range sigch {
+                               boot.logger.WithField("signal", sig).Info("caught signal")
+                               boot.cancel()
+                       }
+               }()
+
                err := boot.run(cfg)
                if err != nil {
                        fmt.Fprintln(boot.Stderr, err)
@@ -192,8 +197,10 @@ func (boot *Booter) run(cfg *arvados.Config) error {
        boot.configfile = conffile.Name()
 
        boot.environ = os.Environ()
+       boot.cleanEnv()
        boot.setEnv("ARVADOS_CONFIG", boot.configfile)
        boot.setEnv("RAILS_ENV", boot.ClusterType)
+       boot.setEnv("TMPDIR", boot.tempdir)
        boot.prependEnv("PATH", filepath.Join(boot.LibPath, "bin")+":")
 
        boot.cluster, err = cfg.GetCluster("")
@@ -209,7 +216,6 @@ func (boot *Booter) run(cfg *arvados.Config) error {
        boot.logger = ctxlog.New(boot.Stderr, boot.cluster.SystemLogs.Format, loglevel).WithFields(logrus.Fields{
                "PID": os.Getpid(),
        })
-       boot.healthChecker = &health.Aggregator{Cluster: boot.cluster}
 
        for _, dir := range []string{boot.LibPath, filepath.Join(boot.LibPath, "bin")} {
                if _, err = os.Stat(filepath.Join(dir, ".")); os.IsNotExist(err) {
@@ -280,7 +286,11 @@ func (boot *Booter) run(cfg *arvados.Config) error {
        if err != nil {
                return err
        }
+       boot.logger.Info("all startup tasks are complete; starting health checks")
+       boot.healthChecker = &health.Aggregator{Cluster: boot.cluster}
        <-boot.ctx.Done()
+       boot.logger.Info("shutting down")
+       boot.waitShutdown.Wait()
        return boot.ctx.Err()
 }
 
@@ -293,7 +303,9 @@ func (boot *Booter) wait(ctx context.Context, tasks ...bootTask) error {
                boot.logger.WithField("task", task.String()).Info("waiting")
                select {
                case <-ch:
+                       boot.logger.WithField("task", task.String()).Info("ready")
                case <-ctx.Done():
+                       boot.logger.WithField("task", task.String()).Info("task was never ready")
                        return ctx.Err()
                }
        }
@@ -306,9 +318,12 @@ func (boot *Booter) Stop() {
 }
 
 func (boot *Booter) WaitReady() (*arvados.URL, bool) {
+       ticker := time.NewTicker(time.Second)
+       defer ticker.Stop()
        for waiting := true; waiting; {
-               time.Sleep(time.Second)
-               if boot.ctx.Err() != nil {
+               select {
+               case <-ticker.C:
+               case <-boot.ctx.Done():
                        return nil, false
                }
                if boot.healthChecker == nil {
@@ -343,6 +358,29 @@ func (boot *Booter) prependEnv(key, prepend string) {
        boot.environ = append(boot.environ, key+"="+prepend)
 }
 
+var cleanEnvPrefixes = []string{
+       "GEM_HOME=",
+       "GEM_PATH=",
+       "ARVADOS_",
+}
+
+func (boot *Booter) cleanEnv() {
+       var cleaned []string
+       for _, s := range boot.environ {
+               drop := false
+               for _, p := range cleanEnvPrefixes {
+                       if strings.HasPrefix(s, p) {
+                               drop = true
+                               break
+                       }
+               }
+               if !drop {
+                       cleaned = append(cleaned, s)
+               }
+       }
+       boot.environ = cleaned
+}
+
 func (boot *Booter) setEnv(key, val string) {
        for i, s := range boot.environ {
                if strings.HasPrefix(s, key+"=") {
@@ -360,7 +398,9 @@ func (boot *Booter) installGoProgram(ctx context.Context, srcpath string) error
 }
 
 func (boot *Booter) setupRubyEnv() error {
-       buf, err := exec.Command("gem", "env", "gempath").Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:...
+       cmd := exec.Command("gem", "env", "gempath")
+       cmd.Env = boot.environ
+       buf, err := cmd.Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:...
        if err != nil || len(buf) == 0 {
                return fmt.Errorf("gem env gempath: %v", err)
        }
@@ -368,6 +408,12 @@ func (boot *Booter) setupRubyEnv() error {
        boot.prependEnv("PATH", gempath+"/bin:")
        boot.setEnv("GEM_HOME", gempath)
        boot.setEnv("GEM_PATH", gempath)
+       // Passenger install doesn't work unless $HOME is ~user
+       u, err := user.Current()
+       if err != nil {
+               return err
+       }
+       boot.setEnv("HOME", u.HomeDir)
        return nil
 }
 
@@ -404,15 +450,24 @@ func (boot *Booter) RunProgram(ctx context.Context, dir string, output io.Writer
        if !strings.HasPrefix(dir, "/") {
                logprefix = dir + ": " + logprefix
        }
-       stderr := &logPrefixer{Writer: boot.Stderr, Prefix: []byte("[" + logprefix + "] ")}
 
        cmd := exec.Command(boot.lookPath(prog), args...)
+       stdout, err := cmd.StdoutPipe()
+       if err != nil {
+               return err
+       }
+       stderr, err := cmd.StderrPipe()
+       if err != nil {
+               return err
+       }
+       logwriter := &logPrefixer{Writer: boot.Stderr, Prefix: []byte("[" + logprefix + "] ")}
+       go io.Copy(logwriter, stderr)
        if output == nil {
-               cmd.Stdout = stderr
+               go io.Copy(logwriter, stdout)
        } else {
-               cmd.Stdout = output
+               go io.Copy(output, stdout)
        }
-       cmd.Stderr = stderr
+
        if strings.HasPrefix(dir, "/") {
                cmd.Dir = dir
        } else {
@@ -434,13 +489,19 @@ func (boot *Booter) RunProgram(ctx context.Context, dir string, output io.Writer
                                cmd.Process.Signal(syscall.SIGTERM)
                                time.Sleep(5 * time.Second)
                                if !exited {
+                                       stdout.Close()
+                                       stderr.Close()
                                        log.WithField("PID", cmd.Process.Pid).Warn("still waiting for child process to exit 5s after SIGTERM")
                                }
                        }
                }
        }()
 
-       err := cmd.Run()
+       err = cmd.Start()
+       if err != nil {
+               return err
+       }
+       err = cmd.Wait()
        if err != nil && ctx.Err() == nil {
                // Only report errors that happen before the context ends.
                return fmt.Errorf("%s: error: %v", cmdline, err)