"os"
"os/exec"
"os/signal"
+ "os/user"
"path/filepath"
"strings"
"sync"
ctx, cancel := context.WithCancel(ctx)
defer cancel()
- ch := make(chan os.Signal)
- signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
- go func() {
- for sig := range ch {
- boot.logger.WithField("signal", sig).Info("caught signal")
- cancel()
- }
- }()
-
var err error
defer func() {
if err != nil {
defer boot.Stop()
if url, ok := boot.WaitReady(); ok {
fmt.Fprintln(stdout, url)
- <-ctx.Done() // wait for signal
+ // Wait for signal/crash + orderly shutdown
+ <-boot.done
return 0
} else {
return 1
done chan struct{}
healthChecker *health.Aggregator
tasksReady map[string]chan bool
+ waitShutdown sync.WaitGroup
tempdir string
configfile string
func (boot *Booter) Start(ctx context.Context, cfg *arvados.Config) {
boot.ctx, boot.cancel = context.WithCancel(ctx)
boot.done = make(chan struct{})
+
go func() {
+ sigch := make(chan os.Signal)
+ signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
+ defer signal.Stop(sigch)
+ go func() {
+ for sig := range sigch {
+ boot.logger.WithField("signal", sig).Info("caught signal")
+ boot.cancel()
+ }
+ }()
+
err := boot.run(cfg)
if err != nil {
fmt.Fprintln(boot.Stderr, err)
boot.configfile = conffile.Name()
boot.environ = os.Environ()
+ boot.cleanEnv()
boot.setEnv("ARVADOS_CONFIG", boot.configfile)
boot.setEnv("RAILS_ENV", boot.ClusterType)
+ boot.setEnv("TMPDIR", boot.tempdir)
boot.prependEnv("PATH", filepath.Join(boot.LibPath, "bin")+":")
boot.cluster, err = cfg.GetCluster("")
boot.logger = ctxlog.New(boot.Stderr, boot.cluster.SystemLogs.Format, loglevel).WithFields(logrus.Fields{
"PID": os.Getpid(),
})
- boot.healthChecker = &health.Aggregator{Cluster: boot.cluster}
for _, dir := range []string{boot.LibPath, filepath.Join(boot.LibPath, "bin")} {
if _, err = os.Stat(filepath.Join(dir, ".")); os.IsNotExist(err) {
if err != nil {
return err
}
+ boot.logger.Info("all startup tasks are complete; starting health checks")
+ boot.healthChecker = &health.Aggregator{Cluster: boot.cluster}
<-boot.ctx.Done()
+ boot.logger.Info("shutting down")
+ boot.waitShutdown.Wait()
return boot.ctx.Err()
}
boot.logger.WithField("task", task.String()).Info("waiting")
select {
case <-ch:
+ boot.logger.WithField("task", task.String()).Info("ready")
case <-ctx.Done():
+ boot.logger.WithField("task", task.String()).Info("task was never ready")
return ctx.Err()
}
}
}
func (boot *Booter) WaitReady() (*arvados.URL, bool) {
+ ticker := time.NewTicker(time.Second)
+ defer ticker.Stop()
for waiting := true; waiting; {
- time.Sleep(time.Second)
- if boot.ctx.Err() != nil {
+ select {
+ case <-ticker.C:
+ case <-boot.ctx.Done():
return nil, false
}
if boot.healthChecker == nil {
boot.environ = append(boot.environ, key+"="+prepend)
}
+var cleanEnvPrefixes = []string{
+ "GEM_HOME=",
+ "GEM_PATH=",
+ "ARVADOS_",
+}
+
+func (boot *Booter) cleanEnv() {
+ var cleaned []string
+ for _, s := range boot.environ {
+ drop := false
+ for _, p := range cleanEnvPrefixes {
+ if strings.HasPrefix(s, p) {
+ drop = true
+ break
+ }
+ }
+ if !drop {
+ cleaned = append(cleaned, s)
+ }
+ }
+ boot.environ = cleaned
+}
+
func (boot *Booter) setEnv(key, val string) {
for i, s := range boot.environ {
if strings.HasPrefix(s, key+"=") {
}
func (boot *Booter) setupRubyEnv() error {
- buf, err := exec.Command("gem", "env", "gempath").Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:...
+ cmd := exec.Command("gem", "env", "gempath")
+ cmd.Env = boot.environ
+ buf, err := cmd.Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:...
if err != nil || len(buf) == 0 {
return fmt.Errorf("gem env gempath: %v", err)
}
boot.prependEnv("PATH", gempath+"/bin:")
boot.setEnv("GEM_HOME", gempath)
boot.setEnv("GEM_PATH", gempath)
+ // Passenger install doesn't work unless $HOME is ~user
+ u, err := user.Current()
+ if err != nil {
+ return err
+ }
+ boot.setEnv("HOME", u.HomeDir)
return nil
}
if !strings.HasPrefix(dir, "/") {
logprefix = dir + ": " + logprefix
}
- stderr := &logPrefixer{Writer: boot.Stderr, Prefix: []byte("[" + logprefix + "] ")}
cmd := exec.Command(boot.lookPath(prog), args...)
+ stdout, err := cmd.StdoutPipe()
+ if err != nil {
+ return err
+ }
+ stderr, err := cmd.StderrPipe()
+ if err != nil {
+ return err
+ }
+ logwriter := &logPrefixer{Writer: boot.Stderr, Prefix: []byte("[" + logprefix + "] ")}
+ go io.Copy(logwriter, stderr)
if output == nil {
- cmd.Stdout = stderr
+ go io.Copy(logwriter, stdout)
} else {
- cmd.Stdout = output
+ go io.Copy(output, stdout)
}
- cmd.Stderr = stderr
+
if strings.HasPrefix(dir, "/") {
cmd.Dir = dir
} else {
cmd.Process.Signal(syscall.SIGTERM)
time.Sleep(5 * time.Second)
if !exited {
+ stdout.Close()
+ stderr.Close()
log.WithField("PID", cmd.Process.Pid).Warn("still waiting for child process to exit 5s after SIGTERM")
}
}
}
}()
- err := cmd.Run()
+ err = cmd.Start()
+ if err != nil {
+ return err
+ }
+ err = cmd.Wait()
if err != nil && ctx.Err() == nil {
// Only report errors that happen before the context ends.
return fmt.Errorf("%s: error: %v", cmdline, err)