defer boot.Stop()
if url, ok := boot.WaitReady(); ok {
fmt.Fprintln(stdout, url)
- <-ctx.Done() // wait for signal
+ // Wait for signal/crash + orderly shutdown
+ <-boot.done
return 0
} else {
return 1
done chan struct{}
healthChecker *health.Aggregator
tasksReady map[string]chan bool
+ waitShutdown sync.WaitGroup
tempdir string
configfile string
boot.healthChecker = &health.Aggregator{Cluster: boot.cluster}
<-boot.ctx.Done()
boot.logger.Info("shutting down")
+ boot.waitShutdown.Wait()
return boot.ctx.Err()
}
}
func (boot *Booter) WaitReady() (*arvados.URL, bool) {
+ ticker := time.NewTicker(time.Second)
+ defer ticker.Stop()
for waiting := true; waiting; {
- time.Sleep(time.Second)
- if boot.ctx.Err() != nil {
+ select {
+ case <-ticker.C:
+ case <-boot.ctx.Done():
return nil, false
}
if boot.healthChecker == nil {
if !strings.HasPrefix(dir, "/") {
logprefix = dir + ": " + logprefix
}
- stderr := &logPrefixer{Writer: boot.Stderr, Prefix: []byte("[" + logprefix + "] ")}
cmd := exec.Command(boot.lookPath(prog), args...)
+ stdout, err := cmd.StdoutPipe()
+ if err != nil {
+ return err
+ }
+ stderr, err := cmd.StderrPipe()
+ if err != nil {
+ return err
+ }
+ logwriter := &logPrefixer{Writer: boot.Stderr, Prefix: []byte("[" + logprefix + "] ")}
+ go io.Copy(logwriter, stderr)
if output == nil {
- cmd.Stdout = stderr
+ go io.Copy(logwriter, stdout)
} else {
- cmd.Stdout = output
+ go io.Copy(output, stdout)
}
- cmd.Stderr = stderr
+
if strings.HasPrefix(dir, "/") {
cmd.Dir = dir
} else {
cmd.Process.Signal(syscall.SIGTERM)
time.Sleep(5 * time.Second)
if !exited {
+ stdout.Close()
+ stderr.Close()
log.WithField("PID", cmd.Process.Pid).Warn("still waiting for child process to exit 5s after SIGTERM")
}
}
}
}()
- err := cmd.Run()
+ err = cmd.Start()
+ if err != nil {
+ return err
+ }
+ err = cmd.Wait()
if err != nil && ctx.Err() == nil {
// Only report errors that happen before the context ends.
return fmt.Errorf("%s: error: %v", cmdline, err)