15954: Don't return from Stop() until child processes end.
[arvados.git] / lib / boot / cmd.go
index 78cfc7b87542e07bd4f4c5aaa639e88cec234dd2..5f5bb1ee77b8c5fc8c209f447d6e44acddaab3a0 100644 (file)
@@ -97,7 +97,8 @@ func (bootCommand) RunCommand(prog string, args []string, stdin io.Reader, stdou
        defer boot.Stop()
        if url, ok := boot.WaitReady(); ok {
                fmt.Fprintln(stdout, url)
-               <-ctx.Done() // wait for signal
+               // Wait for signal/crash + orderly shutdown
+               <-boot.done
                return 0
        } else {
                return 1
@@ -121,6 +122,7 @@ type Booter struct {
        done          chan struct{}
        healthChecker *health.Aggregator
        tasksReady    map[string]chan bool
+       waitShutdown  sync.WaitGroup
 
        tempdir    string
        configfile string
@@ -288,6 +290,7 @@ func (boot *Booter) run(cfg *arvados.Config) error {
        boot.healthChecker = &health.Aggregator{Cluster: boot.cluster}
        <-boot.ctx.Done()
        boot.logger.Info("shutting down")
+       boot.waitShutdown.Wait()
        return boot.ctx.Err()
 }
 
@@ -315,9 +318,12 @@ func (boot *Booter) Stop() {
 }
 
 func (boot *Booter) WaitReady() (*arvados.URL, bool) {
+       ticker := time.NewTicker(time.Second)
+       defer ticker.Stop()
        for waiting := true; waiting; {
-               time.Sleep(time.Second)
-               if boot.ctx.Err() != nil {
+               select {
+               case <-ticker.C:
+               case <-boot.ctx.Done():
                        return nil, false
                }
                if boot.healthChecker == nil {
@@ -444,15 +450,24 @@ func (boot *Booter) RunProgram(ctx context.Context, dir string, output io.Writer
        if !strings.HasPrefix(dir, "/") {
                logprefix = dir + ": " + logprefix
        }
-       stderr := &logPrefixer{Writer: boot.Stderr, Prefix: []byte("[" + logprefix + "] ")}
 
        cmd := exec.Command(boot.lookPath(prog), args...)
+       stdout, err := cmd.StdoutPipe()
+       if err != nil {
+               return err
+       }
+       stderr, err := cmd.StderrPipe()
+       if err != nil {
+               return err
+       }
+       logwriter := &logPrefixer{Writer: boot.Stderr, Prefix: []byte("[" + logprefix + "] ")}
+       go io.Copy(logwriter, stderr)
        if output == nil {
-               cmd.Stdout = stderr
+               go io.Copy(logwriter, stdout)
        } else {
-               cmd.Stdout = output
+               go io.Copy(output, stdout)
        }
-       cmd.Stderr = stderr
+
        if strings.HasPrefix(dir, "/") {
                cmd.Dir = dir
        } else {
@@ -474,13 +489,19 @@ func (boot *Booter) RunProgram(ctx context.Context, dir string, output io.Writer
                                cmd.Process.Signal(syscall.SIGTERM)
                                time.Sleep(5 * time.Second)
                                if !exited {
+                                       stdout.Close()
+                                       stderr.Close()
                                        log.WithField("PID", cmd.Process.Pid).Warn("still waiting for child process to exit 5s after SIGTERM")
                                }
                        }
                }
        }()
 
-       err := cmd.Run()
+       err = cmd.Start()
+       if err != nil {
+               return err
+       }
+       err = cmd.Wait()
        if err != nil && ctx.Err() == nil {
                // Only report errors that happen before the context ends.
                return fmt.Errorf("%s: error: %v", cmdline, err)