"os"
"os/exec"
"os/signal"
+ "os/user"
"path/filepath"
"strings"
"sync"
"git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
ctx, cancel := context.WithCancel(ctx)
defer cancel()
- ch := make(chan os.Signal)
- signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
- go func() {
- for sig := range ch {
- boot.logger.WithField("signal", sig).Info("caught signal")
- cancel()
- }
- }()
-
var err error
defer func() {
if err != nil {
return 2
}
- boot.Start(ctx, loader)
+ loader.SkipAPICalls = true
+ cfg, err := loader.Load()
+ if err != nil {
+ return 1
+ }
+
+ boot.Start(ctx, cfg)
defer boot.Stop()
- if boot.WaitReady() {
- fmt.Fprintln(stdout, boot.cluster.Services.Controller.ExternalURL)
- <-ctx.Done() // wait for signal
+ if url, ok := boot.WaitReady(); ok {
+ fmt.Fprintln(stdout, url)
+ // Wait for signal/crash + orderly shutdown
+ <-boot.done
return 0
} else {
return 1
done chan struct{}
healthChecker *health.Aggregator
tasksReady map[string]chan bool
+ waitShutdown sync.WaitGroup
tempdir string
configfile string
goMutex sync.Mutex
}
-func (boot *Booter) Start(ctx context.Context, loader *config.Loader) {
+func (boot *Booter) Start(ctx context.Context, cfg *arvados.Config) {
boot.ctx, boot.cancel = context.WithCancel(ctx)
boot.done = make(chan struct{})
+
go func() {
- err := boot.run(loader)
+ sigch := make(chan os.Signal)
+ signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
+ defer signal.Stop(sigch)
+ go func() {
+ for sig := range sigch {
+ boot.logger.WithField("signal", sig).Info("caught signal")
+ boot.cancel()
+ }
+ }()
+
+ err := boot.run(cfg)
if err != nil {
fmt.Fprintln(boot.Stderr, err)
}
}()
}
-func (boot *Booter) run(loader *config.Loader) error {
+func (boot *Booter) run(cfg *arvados.Config) error {
cwd, err := os.Getwd()
if err != nil {
return err
return err
}
defer os.RemoveAll(boot.tempdir)
-
- loader.SkipAPICalls = true
- cfg, err := loader.Load()
- if err != nil {
+ if err := os.Mkdir(filepath.Join(boot.tempdir, "bin"), 0777); err != nil {
return err
}
boot.configfile = conffile.Name()
boot.environ = os.Environ()
+ boot.cleanEnv()
boot.setEnv("ARVADOS_CONFIG", boot.configfile)
boot.setEnv("RAILS_ENV", boot.ClusterType)
+ boot.setEnv("TMPDIR", boot.tempdir)
+ boot.prependEnv("PATH", filepath.Join(boot.tempdir, "bin")+":")
boot.prependEnv("PATH", filepath.Join(boot.LibPath, "bin")+":")
boot.cluster, err = cfg.GetCluster("")
boot.logger = ctxlog.New(boot.Stderr, boot.cluster.SystemLogs.Format, loglevel).WithFields(logrus.Fields{
"PID": os.Getpid(),
})
- boot.healthChecker = &health.Aggregator{Cluster: boot.cluster}
for _, dir := range []string{boot.LibPath, filepath.Join(boot.LibPath, "bin")} {
if _, err = os.Stat(filepath.Join(dir, ".")); os.IsNotExist(err) {
if err != nil {
return err
}
+ boot.logger.Info("all startup tasks are complete; starting health checks")
+ boot.healthChecker = &health.Aggregator{Cluster: boot.cluster}
<-boot.ctx.Done()
+ boot.logger.Info("shutting down")
+ boot.waitShutdown.Wait()
return boot.ctx.Err()
}
boot.logger.WithField("task", task.String()).Info("waiting")
select {
case <-ch:
+ boot.logger.WithField("task", task.String()).Info("ready")
case <-ctx.Done():
+ boot.logger.WithField("task", task.String()).Info("task was never ready")
return ctx.Err()
}
}
<-boot.done
}
-func (boot *Booter) WaitReady() bool {
+func (boot *Booter) WaitReady() (*arvados.URL, bool) {
+ ticker := time.NewTicker(time.Second)
+ defer ticker.Stop()
for waiting := true; waiting; {
- time.Sleep(time.Second)
- if boot.ctx.Err() != nil {
- return false
+ select {
+ case <-ticker.C:
+ case <-boot.ctx.Done():
+ return nil, false
}
if boot.healthChecker == nil {
// not set up yet
}
}
}
- return true
+ u := boot.cluster.Services.Controller.ExternalURL
+ return &u, true
}
func (boot *Booter) prependEnv(key, prepend string) {
boot.environ = append(boot.environ, key+"="+prepend)
}
+var cleanEnvPrefixes = []string{
+ "GEM_HOME=",
+ "GEM_PATH=",
+ "ARVADOS_",
+}
+
+func (boot *Booter) cleanEnv() {
+ var cleaned []string
+ for _, s := range boot.environ {
+ drop := false
+ for _, p := range cleanEnvPrefixes {
+ if strings.HasPrefix(s, p) {
+ drop = true
+ break
+ }
+ }
+ if !drop {
+ cleaned = append(cleaned, s)
+ }
+ }
+ boot.environ = cleaned
+}
+
func (boot *Booter) setEnv(key, val string) {
for i, s := range boot.environ {
if strings.HasPrefix(s, key+"=") {
boot.environ = append(boot.environ, key+"="+val)
}
+// Remove all but the first occurrence of each env var.
+func dedupEnv(in []string) []string {
+ saw := map[string]bool{}
+ var out []string
+ for _, kv := range in {
+ if split := strings.Index(kv, "="); split < 1 {
+ panic("invalid environment var: " + kv)
+ } else if saw[kv[:split]] {
+ continue
+ } else {
+ saw[kv[:split]] = true
+ out = append(out, kv)
+ }
+ }
+ return out
+}
+
func (boot *Booter) installGoProgram(ctx context.Context, srcpath string) error {
boot.goMutex.Lock()
defer boot.goMutex.Unlock()
- return boot.RunProgram(ctx, filepath.Join(boot.SourcePath, srcpath), nil, []string{"GOPATH=" + boot.LibPath}, "go", "install")
+ return boot.RunProgram(ctx, filepath.Join(boot.SourcePath, srcpath), nil, []string{"GOBIN=" + boot.tempdir + "/bin"}, "go", "install")
}
func (boot *Booter) setupRubyEnv() error {
- buf, err := exec.Command("gem", "env", "gempath").Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:...
+ cmd := exec.Command("gem", "env", "gempath")
+ cmd.Env = boot.environ
+ buf, err := cmd.Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:...
if err != nil || len(buf) == 0 {
return fmt.Errorf("gem env gempath: %v", err)
}
boot.prependEnv("PATH", gempath+"/bin:")
boot.setEnv("GEM_HOME", gempath)
boot.setEnv("GEM_PATH", gempath)
+ // Passenger install doesn't work unless $HOME is ~user
+ u, err := user.Current()
+ if err != nil {
+ return err
+ }
+ boot.setEnv("HOME", u.HomeDir)
return nil
}
// boot command's stderr.
func (boot *Booter) RunProgram(ctx context.Context, dir string, output io.Writer, env []string, prog string, args ...string) error {
cmdline := fmt.Sprintf("%s", append([]string{prog}, args...))
- fmt.Fprintf(boot.Stderr, "%s executing in %s\n", cmdline, dir)
+ boot.logger.WithField("command", cmdline).WithField("dir", dir).Info("executing")
logprefix := prog
if prog == "bundle" && len(args) > 2 && args[0] == "exec" {
logprefix = args[1]
+ } else if prog == "arvados-server" && len(args) > 1 {
+ logprefix = args[0]
}
if !strings.HasPrefix(dir, "/") {
logprefix = dir + ": " + logprefix
}
- stderr := &logPrefixer{Writer: boot.Stderr, Prefix: []byte("[" + logprefix + "] ")}
cmd := exec.Command(boot.lookPath(prog), args...)
+ stdout, err := cmd.StdoutPipe()
+ if err != nil {
+ return err
+ }
+ stderr, err := cmd.StderrPipe()
+ if err != nil {
+ return err
+ }
+ logwriter := &service.LogPrefixer{Writer: boot.Stderr, Prefix: []byte("[" + logprefix + "] ")}
+ go io.Copy(logwriter, stderr)
if output == nil {
- cmd.Stdout = stderr
+ go io.Copy(logwriter, stdout)
} else {
- cmd.Stdout = output
+ go io.Copy(output, stdout)
}
- cmd.Stderr = stderr
+
if strings.HasPrefix(dir, "/") {
cmd.Dir = dir
} else {
cmd.Dir = filepath.Join(boot.SourcePath, dir)
}
- cmd.Env = append(env, boot.environ...)
+ env = append([]string(nil), env...)
+ env = append(env, boot.environ...)
+ cmd.Env = dedupEnv(env)
exited := false
defer func() { exited = true }()
cmd.Process.Signal(syscall.SIGTERM)
time.Sleep(5 * time.Second)
if !exited {
+ stdout.Close()
+ stderr.Close()
log.WithField("PID", cmd.Process.Pid).Warn("still waiting for child process to exit 5s after SIGTERM")
}
}
}
}()
- err := cmd.Run()
+ err = cmd.Start()
+ if err != nil {
+ return err
+ }
+ err = cmd.Wait()
if err != nil && ctx.Err() == nil {
// Only report errors that happen before the context ends.
return fmt.Errorf("%s: error: %v", cmdline, err)