From 168fedbe65526ff3eabf155039d6e55a8f5eadf6 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 25 Feb 2020 18:44:48 -0500 Subject: [PATCH] 15954: Rename booter to supervisor, tidy up. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/boot/cert.go | 20 +- lib/boot/cmd.go | 713 +---------------------------- lib/boot/nginx.go | 44 +- lib/boot/passenger.go | 36 +- lib/boot/postgresql.go | 31 +- lib/boot/seed.go | 7 +- lib/boot/service.go | 76 +-- lib/boot/supervisor.go | 690 ++++++++++++++++++++++++++++ lib/controller/integration_test.go | 10 +- lib/service/cmd.go | 2 +- 10 files changed, 830 insertions(+), 799 deletions(-) create mode 100644 lib/boot/supervisor.go diff --git a/lib/boot/cert.go b/lib/boot/cert.go index 560579b777..508605bb7a 100644 --- a/lib/boot/cert.go +++ b/lib/boot/cert.go @@ -10,25 +10,31 @@ import ( "path/filepath" ) +// Create a root CA key and use it to make a new server +// certificate+key pair. +// +// In future we'll make one root CA key per host instead of one per +// cluster, so it only needs to be imported to a browser once for +// ongoing dev/test usage. type createCertificates struct{} func (createCertificates) String() string { return "certificates" } -func (createCertificates) Run(ctx context.Context, fail func(error), boot *Booter) error { +func (createCertificates) Run(ctx context.Context, fail func(error), super *Supervisor) error { // Generate root key - err := boot.RunProgram(ctx, boot.tempdir, nil, nil, "openssl", "genrsa", "-out", "rootCA.key", "4096") + err := super.RunProgram(ctx, super.tempdir, nil, nil, "openssl", "genrsa", "-out", "rootCA.key", "4096") if err != nil { return err } // Generate a self-signed root certificate - err = boot.RunProgram(ctx, boot.tempdir, nil, nil, "openssl", "req", "-x509", "-new", "-nodes", "-key", "rootCA.key", "-sha256", "-days", "3650", "-out", "rootCA.crt", "-subj", "/C=US/ST=MA/O=Example Org/CN=localhost") + err = super.RunProgram(ctx, super.tempdir, nil, nil, "openssl", "req", "-x509", "-new", "-nodes", "-key", "rootCA.key", "-sha256", "-days", "3650", "-out", "rootCA.crt", "-subj", "/C=US/ST=MA/O=Example Org/CN=localhost") if err != nil { return err } // Generate server key - err = boot.RunProgram(ctx, boot.tempdir, nil, nil, "openssl", "genrsa", "-out", "server.key", "2048") + err = super.RunProgram(ctx, super.tempdir, nil, nil, "openssl", "genrsa", "-out", "server.key", "2048") if err != nil { return err } @@ -37,7 +43,7 @@ func (createCertificates) Run(ctx context.Context, fail func(error), boot *Boote if err != nil { return err } - err = ioutil.WriteFile(filepath.Join(boot.tempdir, "server.cfg"), append(defaultconf, []byte(` + err = ioutil.WriteFile(filepath.Join(super.tempdir, "server.cfg"), append(defaultconf, []byte(` [SAN] subjectAltName=DNS:localhost,DNS:localhost.localdomain `)...), 0777) @@ -45,12 +51,12 @@ subjectAltName=DNS:localhost,DNS:localhost.localdomain return err } // Generate signing request - err = boot.RunProgram(ctx, boot.tempdir, nil, nil, "openssl", "req", "-new", "-sha256", "-key", "server.key", "-subj", "/C=US/ST=MA/O=Example Org/CN=localhost", "-reqexts", "SAN", "-config", "server.cfg", "-out", "server.csr") + err = super.RunProgram(ctx, super.tempdir, nil, nil, "openssl", "req", "-new", "-sha256", "-key", "server.key", "-subj", "/C=US/ST=MA/O=Example Org/CN=localhost", "-reqexts", "SAN", "-config", "server.cfg", "-out", "server.csr") if err != nil { return err } // Sign certificate - err = boot.RunProgram(ctx, boot.tempdir, nil, nil, "openssl", "x509", "-req", "-in", "server.csr", "-CA", "rootCA.crt", "-CAkey", "rootCA.key", "-CAcreateserial", "-out", "server.crt", "-days", "3650", "-sha256") + err = super.RunProgram(ctx, super.tempdir, nil, nil, "openssl", "x509", "-req", "-in", "server.csr", "-CA", "rootCA.crt", "-CAkey", "rootCA.key", "-CAcreateserial", "-out", "server.crt", "-days", "3650", "-sha256") if err != nil { return err } diff --git a/lib/boot/cmd.go b/lib/boot/cmd.go index e8a86e6ba6..f2266d6b58 100644 --- a/lib/boot/cmd.go +++ b/lib/boot/cmd.go @@ -5,77 +5,58 @@ package boot import ( - "bytes" "context" - "crypto/rand" - "encoding/json" - "errors" "flag" "fmt" "io" - "io/ioutil" - "net" - "os" - "os/exec" - "os/signal" - "os/user" - "path/filepath" - "strings" - "sync" - "syscall" - "time" "git.arvados.org/arvados.git/lib/cmd" "git.arvados.org/arvados.git/lib/config" - "git.arvados.org/arvados.git/lib/service" - "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" - "git.arvados.org/arvados.git/sdk/go/health" - "github.com/sirupsen/logrus" ) var Command cmd.Handler = bootCommand{} -type bootTask interface { +type supervisedTask interface { // Execute the task. Run should return nil when the task is // done enough to satisfy a dependency relationship (e.g., the // service is running and ready). If the task starts a // goroutine that fails after Run returns (e.g., the service // shuts down), it should call cancel. - Run(ctx context.Context, fail func(error), boot *Booter) error + Run(ctx context.Context, fail func(error), super *Supervisor) error String() string } type bootCommand struct{} func (bootCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { - boot := &Booter{ + super := &Supervisor{ Stderr: stderr, logger: ctxlog.New(stderr, "json", "info"), } - ctx := ctxlog.Context(context.Background(), boot.logger) + ctx := ctxlog.Context(context.Background(), super.logger) ctx, cancel := context.WithCancel(ctx) defer cancel() var err error defer func() { if err != nil { - boot.logger.WithError(err).Info("exiting") + super.logger.WithError(err).Info("exiting") } }() flags := flag.NewFlagSet(prog, flag.ContinueOnError) flags.SetOutput(stderr) - loader := config.NewLoader(stdin, boot.logger) + loader := config.NewLoader(stdin, super.logger) loader.SetupFlags(flags) versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0") - flags.StringVar(&boot.SourcePath, "source", ".", "arvados source tree `directory`") - flags.StringVar(&boot.LibPath, "lib", "/var/lib/arvados", "`directory` to install dependencies and library files") - flags.StringVar(&boot.ClusterType, "type", "production", "cluster `type`: development, test, or production") - flags.StringVar(&boot.ListenHost, "listen-host", "localhost", "host name or interface address for service listeners") - flags.StringVar(&boot.ControllerAddr, "controller-address", ":0", "desired controller address, `host:port` or `:port`") - flags.BoolVar(&boot.OwnTemporaryDatabase, "own-temporary-database", false, "bring up a postgres server and create a temporary database") + flags.StringVar(&super.SourcePath, "source", ".", "arvados source tree `directory`") + flags.StringVar(&super.LibPath, "lib", "/var/lib/arvados", "`directory` to install dependencies and library files") + flags.StringVar(&super.ClusterType, "type", "production", "cluster `type`: development, test, or production") + flags.StringVar(&super.ListenHost, "listen-host", "localhost", "host name or interface address for service listeners") + flags.StringVar(&super.ControllerAddr, "controller-address", ":0", "desired controller address, `host:port` or `:port`") + flags.BoolVar(&super.OwnTemporaryDatabase, "own-temporary-database", false, "bring up a postgres server and create a temporary database") err = flags.Parse(args) if err == flag.ErrHelp { err = nil @@ -84,7 +65,7 @@ func (bootCommand) RunCommand(prog string, args []string, stdin io.Reader, stdou return 2 } else if *versionFlag { return cmd.Version.RunCommand(prog, args, stdin, stdout, stderr) - } else if boot.ClusterType != "development" && boot.ClusterType != "test" && boot.ClusterType != "production" { + } else if super.ClusterType != "development" && super.ClusterType != "test" && super.ClusterType != "production" { err = fmt.Errorf("cluster type must be 'development', 'test', or 'production'") return 2 } @@ -95,674 +76,14 @@ func (bootCommand) RunCommand(prog string, args []string, stdin io.Reader, stdou return 1 } - boot.Start(ctx, cfg) - defer boot.Stop() - if url, ok := boot.WaitReady(); ok { + super.Start(ctx, cfg) + defer super.Stop() + if url, ok := super.WaitReady(); ok { fmt.Fprintln(stdout, url) // Wait for signal/crash + orderly shutdown - <-boot.done + <-super.done return 0 } else { return 1 } } - -type Booter struct { - SourcePath string // e.g., /home/username/src/arvados - SourceVersion string // e.g., acbd1324... - LibPath string // e.g., /var/lib/arvados - ClusterType string // e.g., production - ListenHost string // e.g., localhost - ControllerAddr string // e.g., 127.0.0.1:8000 - OwnTemporaryDatabase bool - Stderr io.Writer - - logger logrus.FieldLogger - cluster *arvados.Cluster - - ctx context.Context - cancel context.CancelFunc - done chan struct{} - healthChecker *health.Aggregator - tasksReady map[string]chan bool - waitShutdown sync.WaitGroup - - tempdir string - configfile string - environ []string // for child processes - - setupRubyOnce sync.Once - setupRubyErr error - goMutex sync.Mutex -} - -func (boot *Booter) Start(ctx context.Context, cfg *arvados.Config) { - boot.ctx, boot.cancel = context.WithCancel(ctx) - boot.done = make(chan struct{}) - - go func() { - sigch := make(chan os.Signal) - signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM) - defer signal.Stop(sigch) - go func() { - for sig := range sigch { - boot.logger.WithField("signal", sig).Info("caught signal") - boot.cancel() - } - }() - - err := boot.run(cfg) - if err != nil { - fmt.Fprintln(boot.Stderr, err) - } - close(boot.done) - }() -} - -func (boot *Booter) run(cfg *arvados.Config) error { - cwd, err := os.Getwd() - if err != nil { - return err - } - if !strings.HasPrefix(boot.SourcePath, "/") { - boot.SourcePath = filepath.Join(cwd, boot.SourcePath) - } - boot.SourcePath, err = filepath.EvalSymlinks(boot.SourcePath) - if err != nil { - return err - } - - boot.tempdir, err = ioutil.TempDir("", "arvados-server-boot-") - if err != nil { - return err - } - defer os.RemoveAll(boot.tempdir) - if err := os.Mkdir(filepath.Join(boot.tempdir, "bin"), 0777); err != nil { - return err - } - - // Fill in any missing config keys, and write the resulting - // config in the temp dir for child services to use. - err = boot.autofillConfig(cfg, boot.logger) - if err != nil { - return err - } - conffile, err := os.OpenFile(filepath.Join(boot.tempdir, "config.yml"), os.O_CREATE|os.O_WRONLY, 0777) - if err != nil { - return err - } - defer conffile.Close() - err = json.NewEncoder(conffile).Encode(cfg) - if err != nil { - return err - } - err = conffile.Close() - if err != nil { - return err - } - boot.configfile = conffile.Name() - - boot.environ = os.Environ() - boot.cleanEnv() - boot.setEnv("ARVADOS_CONFIG", boot.configfile) - boot.setEnv("RAILS_ENV", boot.ClusterType) - boot.setEnv("TMPDIR", boot.tempdir) - boot.prependEnv("PATH", filepath.Join(boot.tempdir, "bin")+":") - boot.prependEnv("PATH", filepath.Join(boot.LibPath, "bin")+":") - - boot.cluster, err = cfg.GetCluster("") - if err != nil { - return err - } - // Now that we have the config, replace the bootstrap logger - // with a new one according to the logging config. - loglevel := boot.cluster.SystemLogs.LogLevel - if s := os.Getenv("ARVADOS_DEBUG"); s != "" && s != "0" { - loglevel = "debug" - } - boot.logger = ctxlog.New(boot.Stderr, boot.cluster.SystemLogs.Format, loglevel).WithFields(logrus.Fields{ - "PID": os.Getpid(), - }) - - if boot.SourceVersion == "" { - // Find current source tree version. - var buf bytes.Buffer - err = boot.RunProgram(boot.ctx, ".", &buf, nil, "git", "diff", "--shortstat") - if err != nil { - return err - } - dirty := buf.Len() > 0 - buf.Reset() - err = boot.RunProgram(boot.ctx, ".", &buf, nil, "git", "log", "-n1", "--format=%H") - if err != nil { - return err - } - boot.SourceVersion = strings.TrimSpace(buf.String()) - if dirty { - boot.SourceVersion += "+uncommitted" - } - } else { - return errors.New("specifying a version to run is not yet supported") - } - for _, dir := range []string{boot.LibPath, filepath.Join(boot.LibPath, "bin")} { - if _, err = os.Stat(filepath.Join(dir, ".")); os.IsNotExist(err) { - err = os.Mkdir(dir, 0755) - if err != nil { - return err - } - } else if err != nil { - return err - } - } - _, err = boot.installGoProgram(boot.ctx, "cmd/arvados-server") - if err != nil { - return err - } - err = boot.setupRubyEnv() - if err != nil { - return err - } - - tasks := []bootTask{ - createCertificates{}, - runPostgreSQL{}, - runNginx{}, - runServiceCommand{name: "controller", svc: boot.cluster.Services.Controller, depends: []bootTask{runPostgreSQL{}}}, - runGoProgram{src: "services/arv-git-httpd"}, - runGoProgram{src: "services/health"}, - runGoProgram{src: "services/keepproxy", depends: []bootTask{runPassenger{src: "services/api"}}}, - runGoProgram{src: "services/keepstore", svc: boot.cluster.Services.Keepstore}, - runGoProgram{src: "services/keep-web"}, - runGoProgram{src: "services/ws", depends: []bootTask{runPostgreSQL{}}}, - installPassenger{src: "services/api"}, - runPassenger{src: "services/api", svc: boot.cluster.Services.RailsAPI, depends: []bootTask{createCertificates{}, runPostgreSQL{}, installPassenger{src: "services/api"}}}, - installPassenger{src: "apps/workbench", depends: []bootTask{installPassenger{src: "services/api"}}}, // dependency ensures workbench doesn't delay api startup - runPassenger{src: "apps/workbench", svc: boot.cluster.Services.Workbench1, depends: []bootTask{installPassenger{src: "apps/workbench"}}}, - seedDatabase{}, - } - if boot.ClusterType != "test" { - tasks = append(tasks, - runServiceCommand{name: "dispatch-cloud", svc: boot.cluster.Services.Controller}, - runGoProgram{src: "services/keep-balance"}, - ) - } - boot.tasksReady = map[string]chan bool{} - for _, task := range tasks { - boot.tasksReady[task.String()] = make(chan bool) - } - for _, task := range tasks { - task := task - fail := func(err error) { - if boot.ctx.Err() != nil { - return - } - boot.cancel() - boot.logger.WithField("task", task.String()).WithError(err).Error("task failed") - } - go func() { - boot.logger.WithField("task", task.String()).Info("starting") - err := task.Run(boot.ctx, fail, boot) - if err != nil { - fail(err) - return - } - close(boot.tasksReady[task.String()]) - }() - } - err = boot.wait(boot.ctx, tasks...) - if err != nil { - return err - } - boot.logger.Info("all startup tasks are complete; starting health checks") - boot.healthChecker = &health.Aggregator{Cluster: boot.cluster} - <-boot.ctx.Done() - boot.logger.Info("shutting down") - boot.waitShutdown.Wait() - return boot.ctx.Err() -} - -func (boot *Booter) wait(ctx context.Context, tasks ...bootTask) error { - for _, task := range tasks { - ch, ok := boot.tasksReady[task.String()] - if !ok { - return fmt.Errorf("no such task: %s", task) - } - boot.logger.WithField("task", task.String()).Info("waiting") - select { - case <-ch: - boot.logger.WithField("task", task.String()).Info("ready") - case <-ctx.Done(): - boot.logger.WithField("task", task.String()).Info("task was never ready") - return ctx.Err() - } - } - return nil -} - -func (boot *Booter) Stop() { - boot.cancel() - <-boot.done -} - -func (boot *Booter) WaitReady() (*arvados.URL, bool) { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for waiting := true; waiting; { - select { - case <-ticker.C: - case <-boot.ctx.Done(): - return nil, false - } - if boot.healthChecker == nil { - // not set up yet - continue - } - resp := boot.healthChecker.ClusterHealth() - // The overall health check (resp.Health=="OK") might - // never pass due to missing components (like - // arvados-dispatch-cloud in a test cluster), so - // instead we wait for all configured components to - // pass. - waiting = false - for target, check := range resp.Checks { - if check.Health != "OK" { - waiting = true - boot.logger.WithField("target", target).Debug("waiting") - } - } - } - u := boot.cluster.Services.Controller.ExternalURL - return &u, true -} - -func (boot *Booter) prependEnv(key, prepend string) { - for i, s := range boot.environ { - if strings.HasPrefix(s, key+"=") { - boot.environ[i] = key + "=" + prepend + s[len(key)+1:] - return - } - } - boot.environ = append(boot.environ, key+"="+prepend) -} - -var cleanEnvPrefixes = []string{ - "GEM_HOME=", - "GEM_PATH=", - "ARVADOS_", -} - -func (boot *Booter) cleanEnv() { - var cleaned []string - for _, s := range boot.environ { - drop := false - for _, p := range cleanEnvPrefixes { - if strings.HasPrefix(s, p) { - drop = true - break - } - } - if !drop { - cleaned = append(cleaned, s) - } - } - boot.environ = cleaned -} - -func (boot *Booter) setEnv(key, val string) { - for i, s := range boot.environ { - if strings.HasPrefix(s, key+"=") { - boot.environ[i] = key + "=" + val - return - } - } - boot.environ = append(boot.environ, key+"="+val) -} - -// Remove all but the first occurrence of each env var. -func dedupEnv(in []string) []string { - saw := map[string]bool{} - var out []string - for _, kv := range in { - if split := strings.Index(kv, "="); split < 1 { - panic("invalid environment var: " + kv) - } else if saw[kv[:split]] { - continue - } else { - saw[kv[:split]] = true - out = append(out, kv) - } - } - return out -} - -func (boot *Booter) installGoProgram(ctx context.Context, srcpath string) (string, error) { - _, basename := filepath.Split(srcpath) - bindir := filepath.Join(boot.tempdir, "bin") - binfile := filepath.Join(bindir, basename) - err := boot.RunProgram(ctx, filepath.Join(boot.SourcePath, srcpath), nil, []string{"GOBIN=" + bindir}, "go", "install", "-ldflags", "-X git.arvados.org/arvados.git/lib/cmd.version="+boot.SourceVersion+" -X main.version="+boot.SourceVersion) - return binfile, err -} - -func (boot *Booter) setupRubyEnv() error { - cmd := exec.Command("gem", "env", "gempath") - cmd.Env = boot.environ - buf, err := cmd.Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:... - if err != nil || len(buf) == 0 { - return fmt.Errorf("gem env gempath: %v", err) - } - gempath := string(bytes.Split(buf, []byte{':'})[0]) - boot.prependEnv("PATH", gempath+"/bin:") - boot.setEnv("GEM_HOME", gempath) - boot.setEnv("GEM_PATH", gempath) - // Passenger install doesn't work unless $HOME is ~user - u, err := user.Current() - if err != nil { - return err - } - boot.setEnv("HOME", u.HomeDir) - return nil -} - -func (boot *Booter) lookPath(prog string) string { - for _, val := range boot.environ { - if strings.HasPrefix(val, "PATH=") { - for _, dir := range filepath.SplitList(val[5:]) { - path := filepath.Join(dir, prog) - if fi, err := os.Stat(path); err == nil && fi.Mode()&0111 != 0 { - return path - } - } - } - } - return prog -} - -// Run prog with args, using dir as working directory. If ctx is -// cancelled while the child is running, RunProgram terminates the -// child, waits for it to exit, then returns. -// -// Child's environment will have our env vars, plus any given in env. -// -// Child's stdout will be written to output if non-nil, otherwise the -// boot command's stderr. -func (boot *Booter) RunProgram(ctx context.Context, dir string, output io.Writer, env []string, prog string, args ...string) error { - cmdline := fmt.Sprintf("%s", append([]string{prog}, args...)) - boot.logger.WithField("command", cmdline).WithField("dir", dir).Info("executing") - - logprefix := strings.TrimPrefix(prog, boot.tempdir+"/bin/") - if prog == "bundle" && len(args) > 2 && args[0] == "exec" { - logprefix = args[1] - } else if prog == "arvados-server" && len(args) > 1 { - logprefix = args[0] - } - if !strings.HasPrefix(dir, "/") { - logprefix = dir + ": " + logprefix - } - - cmd := exec.Command(boot.lookPath(prog), args...) - stdout, err := cmd.StdoutPipe() - if err != nil { - return err - } - stderr, err := cmd.StderrPipe() - if err != nil { - return err - } - logwriter := &service.LogPrefixer{Writer: boot.Stderr, Prefix: []byte("[" + logprefix + "] ")} - var copiers sync.WaitGroup - copiers.Add(1) - go func() { - io.Copy(logwriter, stderr) - copiers.Done() - }() - copiers.Add(1) - go func() { - if output == nil { - io.Copy(logwriter, stdout) - } else { - io.Copy(output, stdout) - } - copiers.Done() - }() - - if strings.HasPrefix(dir, "/") { - cmd.Dir = dir - } else { - cmd.Dir = filepath.Join(boot.SourcePath, dir) - } - env = append([]string(nil), env...) - env = append(env, boot.environ...) - cmd.Env = dedupEnv(env) - - exited := false - defer func() { exited = true }() - go func() { - <-ctx.Done() - log := ctxlog.FromContext(ctx).WithFields(logrus.Fields{"dir": dir, "cmdline": cmdline}) - for !exited { - if cmd.Process == nil { - log.Debug("waiting for child process to start") - time.Sleep(time.Second / 2) - } else { - log.WithField("PID", cmd.Process.Pid).Debug("sending SIGTERM") - cmd.Process.Signal(syscall.SIGTERM) - time.Sleep(5 * time.Second) - if !exited { - stdout.Close() - stderr.Close() - log.WithField("PID", cmd.Process.Pid).Warn("still waiting for child process to exit 5s after SIGTERM") - } - } - } - }() - - err = cmd.Start() - if err != nil { - return err - } - copiers.Wait() - err = cmd.Wait() - if ctx.Err() != nil { - // Return "context canceled", instead of the "killed" - // error that was probably caused by the context being - // canceled. - return ctx.Err() - } else if err != nil { - return fmt.Errorf("%s: error: %v", cmdline, err) - } - return nil -} - -func (boot *Booter) autofillConfig(cfg *arvados.Config, log logrus.FieldLogger) error { - cluster, err := cfg.GetCluster("") - if err != nil { - return err - } - usedPort := map[string]bool{} - nextPort := func() string { - for { - port, err := availablePort(boot.ListenHost + ":0") - if err != nil { - panic(err) - } - if usedPort[port] { - continue - } - usedPort[port] = true - return port - } - } - if cluster.Services.Controller.ExternalURL.Host == "" { - h, p, err := net.SplitHostPort(boot.ControllerAddr) - if err != nil { - return err - } - if h == "" { - h = boot.ListenHost - } - if p == "0" { - p, err = availablePort(":0") - if err != nil { - return err - } - usedPort[p] = true - } - cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p)} - } - for _, svc := range []*arvados.Service{ - &cluster.Services.Controller, - &cluster.Services.DispatchCloud, - &cluster.Services.GitHTTP, - &cluster.Services.Health, - &cluster.Services.Keepproxy, - &cluster.Services.Keepstore, - &cluster.Services.RailsAPI, - &cluster.Services.WebDAV, - &cluster.Services.WebDAVDownload, - &cluster.Services.Websocket, - &cluster.Services.Workbench1, - } { - if svc == &cluster.Services.DispatchCloud && boot.ClusterType == "test" { - continue - } - if svc.ExternalURL.Host == "" && (svc == &cluster.Services.Controller || - svc == &cluster.Services.GitHTTP || - svc == &cluster.Services.Keepproxy || - svc == &cluster.Services.WebDAV || - svc == &cluster.Services.WebDAVDownload || - svc == &cluster.Services.Websocket || - svc == &cluster.Services.Workbench1) { - svc.ExternalURL = arvados.URL{Scheme: "https", Host: fmt.Sprintf("%s:%s", boot.ListenHost, nextPort())} - } - if len(svc.InternalURLs) == 0 { - svc.InternalURLs = map[arvados.URL]arvados.ServiceInstance{ - arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", boot.ListenHost, nextPort())}: arvados.ServiceInstance{}, - } - } - } - if cluster.SystemRootToken == "" { - cluster.SystemRootToken = randomHexString(64) - } - if cluster.ManagementToken == "" { - cluster.ManagementToken = randomHexString(64) - } - if cluster.API.RailsSessionSecretToken == "" { - cluster.API.RailsSessionSecretToken = randomHexString(64) - } - if cluster.Collections.BlobSigningKey == "" { - cluster.Collections.BlobSigningKey = randomHexString(64) - } - if boot.ClusterType != "production" && cluster.Containers.DispatchPrivateKey == "" { - buf, err := ioutil.ReadFile(filepath.Join(boot.SourcePath, "lib", "dispatchcloud", "test", "sshkey_dispatch")) - if err != nil { - return err - } - cluster.Containers.DispatchPrivateKey = string(buf) - } - if boot.ClusterType != "production" { - cluster.TLS.Insecure = true - } - if boot.ClusterType == "test" { - // Add a second keepstore process. - cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", boot.ListenHost, nextPort())}] = arvados.ServiceInstance{} - - // Create a directory-backed volume for each keepstore - // process. - cluster.Volumes = map[string]arvados.Volume{} - for url := range cluster.Services.Keepstore.InternalURLs { - volnum := len(cluster.Volumes) - datadir := fmt.Sprintf("%s/keep%d.data", boot.tempdir, volnum) - if _, err = os.Stat(datadir + "/."); err == nil { - } else if !os.IsNotExist(err) { - return err - } else if err = os.Mkdir(datadir, 0777); err != nil { - return err - } - cluster.Volumes[fmt.Sprintf(cluster.ClusterID+"-nyw5e-%015d", volnum)] = arvados.Volume{ - Driver: "Directory", - DriverParameters: json.RawMessage(fmt.Sprintf(`{"Root":%q}`, datadir)), - AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{ - url: {}, - }, - } - } - } - if boot.OwnTemporaryDatabase { - cluster.PostgreSQL.Connection = arvados.PostgreSQLConnection{ - "client_encoding": "utf8", - "host": "localhost", - "port": nextPort(), - "dbname": "arvados_test", - "user": "arvados", - "password": "insecure_arvados_test", - } - } - - cfg.Clusters[cluster.ClusterID] = *cluster - return nil -} - -func randomHexString(chars int) string { - b := make([]byte, chars/2) - _, err := rand.Read(b) - if err != nil { - panic(err) - } - return fmt.Sprintf("%x", b) -} - -func internalPort(svc arvados.Service) (string, error) { - for u := range svc.InternalURLs { - if _, p, err := net.SplitHostPort(u.Host); err != nil { - return "", err - } else if p != "" { - return p, nil - } else if u.Scheme == "https" { - return "443", nil - } else { - return "80", nil - } - } - return "", fmt.Errorf("service has no InternalURLs") -} - -func externalPort(svc arvados.Service) (string, error) { - if _, p, err := net.SplitHostPort(svc.ExternalURL.Host); err != nil { - return "", err - } else if p != "" { - return p, nil - } else if svc.ExternalURL.Scheme == "https" { - return "443", nil - } else { - return "80", nil - } -} - -func availablePort(addr string) (string, error) { - ln, err := net.Listen("tcp", addr) - if err != nil { - return "", err - } - defer ln.Close() - _, port, err := net.SplitHostPort(ln.Addr().String()) - if err != nil { - return "", err - } - return port, nil -} - -// Try to connect to addr until it works, then close ch. Give up if -// ctx cancels. -func waitForConnect(ctx context.Context, addr string) error { - dialer := net.Dialer{Timeout: time.Second} - for ctx.Err() == nil { - conn, err := dialer.DialContext(ctx, "tcp", addr) - if err != nil { - time.Sleep(time.Second / 10) - continue - } - conn.Close() - return nil - } - return ctx.Err() -} diff --git a/lib/boot/nginx.go b/lib/boot/nginx.go index a06d3a7008..2d5c74594e 100644 --- a/lib/boot/nginx.go +++ b/lib/boot/nginx.go @@ -16,33 +16,35 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvados" ) +// Run an Nginx process that proxies the supervisor's configured +// ExternalURLs to the appropriate InternalURLs. type runNginx struct{} func (runNginx) String() string { return "nginx" } -func (runNginx) Run(ctx context.Context, fail func(error), boot *Booter) error { +func (runNginx) Run(ctx context.Context, fail func(error), super *Supervisor) error { vars := map[string]string{ - "LISTENHOST": boot.ListenHost, - "SSLCERT": filepath.Join(boot.SourcePath, "services", "api", "tmp", "self-signed.pem"), // TODO: root ca - "SSLKEY": filepath.Join(boot.SourcePath, "services", "api", "tmp", "self-signed.key"), // TODO: root ca - "ACCESSLOG": filepath.Join(boot.tempdir, "nginx_access.log"), - "ERRORLOG": filepath.Join(boot.tempdir, "nginx_error.log"), - "TMPDIR": boot.tempdir, + "LISTENHOST": super.ListenHost, + "SSLCERT": filepath.Join(super.SourcePath, "services", "api", "tmp", "self-signed.pem"), // TODO: root ca + "SSLKEY": filepath.Join(super.SourcePath, "services", "api", "tmp", "self-signed.key"), // TODO: root ca + "ACCESSLOG": filepath.Join(super.tempdir, "nginx_access.log"), + "ERRORLOG": filepath.Join(super.tempdir, "nginx_error.log"), + "TMPDIR": super.tempdir, } var err error for _, cmpt := range []struct { varname string svc arvados.Service }{ - {"CONTROLLER", boot.cluster.Services.Controller}, - {"KEEPWEB", boot.cluster.Services.WebDAV}, - {"KEEPWEBDL", boot.cluster.Services.WebDAVDownload}, - {"KEEPPROXY", boot.cluster.Services.Keepproxy}, - {"GIT", boot.cluster.Services.GitHTTP}, - {"WORKBENCH1", boot.cluster.Services.Workbench1}, - {"WS", boot.cluster.Services.Websocket}, + {"CONTROLLER", super.cluster.Services.Controller}, + {"KEEPWEB", super.cluster.Services.WebDAV}, + {"KEEPWEBDL", super.cluster.Services.WebDAVDownload}, + {"KEEPPROXY", super.cluster.Services.Keepproxy}, + {"GIT", super.cluster.Services.GitHTTP}, + {"WORKBENCH1", super.cluster.Services.Workbench1}, + {"WS", super.cluster.Services.Websocket}, } { vars[cmpt.varname+"PORT"], err = internalPort(cmpt.svc) if err != nil { @@ -53,7 +55,7 @@ func (runNginx) Run(ctx context.Context, fail func(error), boot *Booter) error { return fmt.Errorf("%s external port: %s (%v)", cmpt.varname, err, cmpt.svc) } } - tmpl, err := ioutil.ReadFile(filepath.Join(boot.SourcePath, "sdk", "python", "tests", "nginx.conf")) + tmpl, err := ioutil.ReadFile(filepath.Join(super.SourcePath, "sdk", "python", "tests", "nginx.conf")) if err != nil { return err } @@ -63,7 +65,7 @@ func (runNginx) Run(ctx context.Context, fail func(error), boot *Booter) error { } return vars[src[2:len(src)-2]] }) - conffile := filepath.Join(boot.tempdir, "nginx.conf") + conffile := filepath.Join(super.tempdir, "nginx.conf") err = ioutil.WriteFile(conffile, []byte(conf), 0755) if err != nil { return err @@ -77,13 +79,13 @@ func (runNginx) Run(ctx context.Context, fail func(error), boot *Booter) error { } } } - boot.waitShutdown.Add(1) + super.waitShutdown.Add(1) go func() { - defer boot.waitShutdown.Done() - fail(boot.RunProgram(ctx, ".", nil, nil, nginx, + defer super.waitShutdown.Done() + fail(super.RunProgram(ctx, ".", nil, nil, nginx, "-g", "error_log stderr info;", - "-g", "pid "+filepath.Join(boot.tempdir, "nginx.pid")+";", + "-g", "pid "+filepath.Join(super.tempdir, "nginx.pid")+";", "-c", conffile)) }() - return waitForConnect(ctx, boot.cluster.Services.Controller.ExternalURL.Host) + return waitForConnect(ctx, super.cluster.Services.Controller.ExternalURL.Host) } diff --git a/lib/boot/passenger.go b/lib/boot/passenger.go index 10581a697e..36be2f1a0d 100644 --- a/lib/boot/passenger.go +++ b/lib/boot/passenger.go @@ -20,17 +20,19 @@ import ( // concurrent installs. var passengerInstallMutex sync.Mutex +// Install a Rails application's dependencies, including phusion +// passenger. type installPassenger struct { src string - depends []bootTask + depends []supervisedTask } func (runner installPassenger) String() string { return "installPassenger:" + runner.src } -func (runner installPassenger) Run(ctx context.Context, fail func(error), boot *Booter) error { - err := boot.wait(ctx, runner.depends...) +func (runner installPassenger) Run(ctx context.Context, fail func(error), super *Supervisor) error { + err := super.wait(ctx, runner.depends...) if err != nil { return err } @@ -39,32 +41,32 @@ func (runner installPassenger) Run(ctx context.Context, fail func(error), boot * defer passengerInstallMutex.Unlock() var buf bytes.Buffer - err = boot.RunProgram(ctx, runner.src, &buf, nil, "gem", "list", "--details", "bundler") + err = super.RunProgram(ctx, runner.src, &buf, nil, "gem", "list", "--details", "bundler") if err != nil { return err } for _, version := range []string{"1.11.0", "1.17.3", "2.0.2"} { if !strings.Contains(buf.String(), "("+version+")") { - err = boot.RunProgram(ctx, runner.src, nil, nil, "gem", "install", "--user", "bundler:1.11", "bundler:1.17.3", "bundler:2.0.2") + err = super.RunProgram(ctx, runner.src, nil, nil, "gem", "install", "--user", "bundler:1.11", "bundler:1.17.3", "bundler:2.0.2") if err != nil { return err } break } } - err = boot.RunProgram(ctx, runner.src, nil, nil, "bundle", "install", "--jobs", "4", "--path", filepath.Join(os.Getenv("HOME"), ".gem")) + err = super.RunProgram(ctx, runner.src, nil, nil, "bundle", "install", "--jobs", "4", "--path", filepath.Join(os.Getenv("HOME"), ".gem")) if err != nil { return err } - err = boot.RunProgram(ctx, runner.src, nil, nil, "bundle", "exec", "passenger-config", "build-native-support") + err = super.RunProgram(ctx, runner.src, nil, nil, "bundle", "exec", "passenger-config", "build-native-support") if err != nil { return err } - err = boot.RunProgram(ctx, runner.src, nil, nil, "bundle", "exec", "passenger-config", "install-standalone-runtime") + err = super.RunProgram(ctx, runner.src, nil, nil, "bundle", "exec", "passenger-config", "install-standalone-runtime") if err != nil { return err } - err = boot.RunProgram(ctx, runner.src, nil, nil, "bundle", "exec", "passenger-config", "validate-install") + err = super.RunProgram(ctx, runner.src, nil, nil, "bundle", "exec", "passenger-config", "validate-install") if err != nil { return err } @@ -74,15 +76,15 @@ func (runner installPassenger) Run(ctx context.Context, fail func(error), boot * type runPassenger struct { src string svc arvados.Service - depends []bootTask + depends []supervisedTask } func (runner runPassenger) String() string { return "runPassenger:" + runner.src } -func (runner runPassenger) Run(ctx context.Context, fail func(error), boot *Booter) error { - err := boot.wait(ctx, runner.depends...) +func (runner runPassenger) Run(ctx context.Context, fail func(error), super *Supervisor) error { + err := super.wait(ctx, runner.depends...) if err != nil { return err } @@ -99,19 +101,19 @@ func (runner runPassenger) Run(ctx context.Context, fail func(error), boot *Boot "error": "1", "fatal": "0", "panic": "0", - }[boot.cluster.SystemLogs.LogLevel]; ok { + }[super.cluster.SystemLogs.LogLevel]; ok { loglevel = lvl } - boot.waitShutdown.Add(1) + super.waitShutdown.Add(1) go func() { - defer boot.waitShutdown.Done() - err = boot.RunProgram(ctx, runner.src, nil, nil, "bundle", "exec", + defer super.waitShutdown.Done() + err = super.RunProgram(ctx, runner.src, nil, nil, "bundle", "exec", "passenger", "start", "-p", port, "--log-file", "/dev/stderr", "--log-level", loglevel, "--no-friendly-error-pages", - "--pid-file", filepath.Join(boot.tempdir, "passenger."+strings.Replace(runner.src, "/", "_", -1)+".pid")) + "--pid-file", filepath.Join(super.tempdir, "passenger."+strings.Replace(runner.src, "/", "_", -1)+".pid")) fail(err) }() return nil diff --git a/lib/boot/postgresql.go b/lib/boot/postgresql.go index 9f0de2dd67..df98904151 100644 --- a/lib/boot/postgresql.go +++ b/lib/boot/postgresql.go @@ -19,50 +19,53 @@ import ( "github.com/lib/pq" ) +// Run a postgresql server in a private data directory. Set up a db +// user, database, and TCP listener that match the supervisor's +// configured database connection info. type runPostgreSQL struct{} func (runPostgreSQL) String() string { return "postgresql" } -func (runPostgreSQL) Run(ctx context.Context, fail func(error), boot *Booter) error { - err := boot.wait(ctx, createCertificates{}) +func (runPostgreSQL) Run(ctx context.Context, fail func(error), super *Supervisor) error { + err := super.wait(ctx, createCertificates{}) if err != nil { return err } buf := bytes.NewBuffer(nil) - err = boot.RunProgram(ctx, boot.tempdir, buf, nil, "pg_config", "--bindir") + err = super.RunProgram(ctx, super.tempdir, buf, nil, "pg_config", "--bindir") if err != nil { return err } bindir := strings.TrimSpace(buf.String()) - datadir := filepath.Join(boot.tempdir, "pgdata") + datadir := filepath.Join(super.tempdir, "pgdata") err = os.Mkdir(datadir, 0755) if err != nil { return err } - err = boot.RunProgram(ctx, boot.tempdir, nil, nil, filepath.Join(bindir, "initdb"), "-D", datadir) + err = super.RunProgram(ctx, super.tempdir, nil, nil, filepath.Join(bindir, "initdb"), "-D", datadir) if err != nil { return err } - err = boot.RunProgram(ctx, boot.tempdir, nil, nil, "cp", "server.crt", "server.key", datadir) + err = super.RunProgram(ctx, super.tempdir, nil, nil, "cp", "server.crt", "server.key", datadir) if err != nil { return err } - port := boot.cluster.PostgreSQL.Connection["port"] + port := super.cluster.PostgreSQL.Connection["port"] - boot.waitShutdown.Add(1) + super.waitShutdown.Add(1) go func() { - defer boot.waitShutdown.Done() - fail(boot.RunProgram(ctx, boot.tempdir, nil, nil, filepath.Join(bindir, "postgres"), + defer super.waitShutdown.Done() + fail(super.RunProgram(ctx, super.tempdir, nil, nil, filepath.Join(bindir, "postgres"), "-l", // enable ssl "-D", datadir, // data dir "-k", datadir, // socket dir - "-p", boot.cluster.PostgreSQL.Connection["port"], + "-p", super.cluster.PostgreSQL.Connection["port"], )) }() @@ -70,7 +73,7 @@ func (runPostgreSQL) Run(ctx context.Context, fail func(error), boot *Booter) er if ctx.Err() != nil { return ctx.Err() } - if exec.CommandContext(ctx, "pg_isready", "--timeout=10", "--host="+boot.cluster.PostgreSQL.Connection["host"], "--port="+port).Run() == nil { + if exec.CommandContext(ctx, "pg_isready", "--timeout=10", "--host="+super.cluster.PostgreSQL.Connection["host"], "--port="+port).Run() == nil { break } time.Sleep(time.Second / 2) @@ -89,11 +92,11 @@ func (runPostgreSQL) Run(ctx context.Context, fail func(error), boot *Booter) er return fmt.Errorf("db conn failed: %s", err) } defer conn.Close() - _, err = conn.ExecContext(ctx, `CREATE USER `+pq.QuoteIdentifier(boot.cluster.PostgreSQL.Connection["user"])+` WITH SUPERUSER ENCRYPTED PASSWORD `+pq.QuoteLiteral(boot.cluster.PostgreSQL.Connection["password"])) + _, err = conn.ExecContext(ctx, `CREATE USER `+pq.QuoteIdentifier(super.cluster.PostgreSQL.Connection["user"])+` WITH SUPERUSER ENCRYPTED PASSWORD `+pq.QuoteLiteral(super.cluster.PostgreSQL.Connection["password"])) if err != nil { return fmt.Errorf("createuser failed: %s", err) } - _, err = conn.ExecContext(ctx, `CREATE DATABASE `+pq.QuoteIdentifier(boot.cluster.PostgreSQL.Connection["dbname"])) + _, err = conn.ExecContext(ctx, `CREATE DATABASE `+pq.QuoteIdentifier(super.cluster.PostgreSQL.Connection["dbname"])) if err != nil { return fmt.Errorf("createdb failed: %s", err) } diff --git a/lib/boot/seed.go b/lib/boot/seed.go index 9f086d5445..650c836886 100644 --- a/lib/boot/seed.go +++ b/lib/boot/seed.go @@ -8,18 +8,19 @@ import ( "context" ) +// Populate a blank database with arvados tables and seed rows. type seedDatabase struct{} func (seedDatabase) String() string { return "seedDatabase" } -func (seedDatabase) Run(ctx context.Context, fail func(error), boot *Booter) error { - err := boot.wait(ctx, runPostgreSQL{}, installPassenger{src: "services/api"}) +func (seedDatabase) Run(ctx context.Context, fail func(error), super *Supervisor) error { + err := super.wait(ctx, runPostgreSQL{}, installPassenger{src: "services/api"}) if err != nil { return err } - err = boot.RunProgram(ctx, "services/api", nil, nil, "bundle", "exec", "rake", "db:setup") + err = super.RunProgram(ctx, "services/api", nil, nil, "bundle", "exec", "rake", "db:setup") if err != nil { return err } diff --git a/lib/boot/service.go b/lib/boot/service.go index 0ebf647adf..018e9f8bb0 100644 --- a/lib/boot/service.go +++ b/lib/boot/service.go @@ -6,41 +6,52 @@ package boot import ( "context" + "errors" "path/filepath" "git.arvados.org/arvados.git/sdk/go/arvados" ) +// Run a service using the arvados-server binary. +// +// In future this will bring up the service in the current process, +// but for now (at least until the subcommand handlers get a shutdown +// mechanism) it starts a child process using the arvados-server +// binary, which the supervisor is assumed to have installed in +// {super.tempdir}/bin/. type runServiceCommand struct { - name string - svc arvados.Service - depends []bootTask + name string // arvados-server subcommand, e.g., "controller" + svc arvados.Service // cluster.Services.* entry with the desired InternalURLs + depends []supervisedTask // wait for these tasks before starting } func (runner runServiceCommand) String() string { return runner.name } -func (runner runServiceCommand) Run(ctx context.Context, fail func(error), boot *Booter) error { - boot.wait(ctx, runner.depends...) - binfile := filepath.Join(boot.tempdir, "bin", "arvados-server") - err := boot.RunProgram(ctx, boot.tempdir, nil, nil, binfile, "-version") +func (runner runServiceCommand) Run(ctx context.Context, fail func(error), super *Supervisor) error { + binfile := filepath.Join(super.tempdir, "bin", "arvados-server") + err := super.RunProgram(ctx, super.tempdir, nil, nil, binfile, "-version") if err != nil { return err } - go func() { - var u arvados.URL - for u = range runner.svc.InternalURLs { - } - fail(boot.RunProgram(ctx, boot.tempdir, nil, []string{"ARVADOS_SERVICE_INTERNAL_URL=" + u.String()}, binfile, runner.name, "-config", boot.configfile)) - }() + super.wait(ctx, runner.depends...) + for u := range runner.svc.InternalURLs { + u := u + super.waitShutdown.Add(1) + go func() { + defer super.waitShutdown.Done() + fail(super.RunProgram(ctx, super.tempdir, nil, []string{"ARVADOS_SERVICE_INTERNAL_URL=" + u.String()}, binfile, runner.name, "-config", super.configfile)) + }() + } return nil } +// Run a Go service that isn't bundled in arvados-server. type runGoProgram struct { - src string - svc arvados.Service - depends []bootTask + src string // source dir, e.g., "services/keepproxy" + svc arvados.Service // cluster.Services.* entry with the desired InternalURLs + depends []supervisedTask // wait for these tasks before starting } func (runner runGoProgram) String() string { @@ -48,9 +59,12 @@ func (runner runGoProgram) String() string { return basename } -func (runner runGoProgram) Run(ctx context.Context, fail func(error), boot *Booter) error { - boot.wait(ctx, runner.depends...) - binfile, err := boot.installGoProgram(ctx, runner.src) +func (runner runGoProgram) Run(ctx context.Context, fail func(error), super *Supervisor) error { + if len(runner.svc.InternalURLs) == 0 { + return errors.New("bug: runGoProgram needs non-empty svc.InternalURLs") + } + + binfile, err := super.installGoProgram(ctx, runner.src) if err != nil { return err } @@ -58,26 +72,18 @@ func (runner runGoProgram) Run(ctx context.Context, fail func(error), boot *Boot return ctx.Err() } - err = boot.RunProgram(ctx, boot.tempdir, nil, nil, binfile, "-version") + err = super.RunProgram(ctx, super.tempdir, nil, nil, binfile, "-version") if err != nil { return err } - if len(runner.svc.InternalURLs) > 0 { - // Run one for each URL - for u := range runner.svc.InternalURLs { - u := u - boot.waitShutdown.Add(1) - go func() { - defer boot.waitShutdown.Done() - fail(boot.RunProgram(ctx, boot.tempdir, nil, []string{"ARVADOS_SERVICE_INTERNAL_URL=" + u.String()}, binfile)) - }() - } - } else { - // Just run one - boot.waitShutdown.Add(1) + + super.wait(ctx, runner.depends...) + for u := range runner.svc.InternalURLs { + u := u + super.waitShutdown.Add(1) go func() { - defer boot.waitShutdown.Done() - fail(boot.RunProgram(ctx, boot.tempdir, nil, nil, binfile)) + defer super.waitShutdown.Done() + fail(super.RunProgram(ctx, super.tempdir, nil, []string{"ARVADOS_SERVICE_INTERNAL_URL=" + u.String()}, binfile)) }() } return nil diff --git a/lib/boot/supervisor.go b/lib/boot/supervisor.go new file mode 100644 index 0000000000..59e1ae352e --- /dev/null +++ b/lib/boot/supervisor.go @@ -0,0 +1,690 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package boot + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net" + "os" + "os/exec" + "os/signal" + "os/user" + "path/filepath" + "strings" + "sync" + "syscall" + "time" + + "git.arvados.org/arvados.git/lib/service" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "git.arvados.org/arvados.git/sdk/go/health" + "github.com/sirupsen/logrus" +) + +type Supervisor struct { + SourcePath string // e.g., /home/username/src/arvados + SourceVersion string // e.g., acbd1324... + LibPath string // e.g., /var/lib/arvados + ClusterType string // e.g., production + ListenHost string // e.g., localhost + ControllerAddr string // e.g., 127.0.0.1:8000 + OwnTemporaryDatabase bool + Stderr io.Writer + + logger logrus.FieldLogger + cluster *arvados.Cluster + + ctx context.Context + cancel context.CancelFunc + done chan struct{} + healthChecker *health.Aggregator + tasksReady map[string]chan bool + waitShutdown sync.WaitGroup + + tempdir string + configfile string + environ []string // for child processes +} + +func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config) { + super.ctx, super.cancel = context.WithCancel(ctx) + super.done = make(chan struct{}) + + go func() { + sigch := make(chan os.Signal) + signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM) + defer signal.Stop(sigch) + go func() { + for sig := range sigch { + super.logger.WithField("signal", sig).Info("caught signal") + super.cancel() + } + }() + + err := super.run(cfg) + if err != nil { + fmt.Fprintln(super.Stderr, err) + } + close(super.done) + }() +} + +func (super *Supervisor) run(cfg *arvados.Config) error { + cwd, err := os.Getwd() + if err != nil { + return err + } + if !strings.HasPrefix(super.SourcePath, "/") { + super.SourcePath = filepath.Join(cwd, super.SourcePath) + } + super.SourcePath, err = filepath.EvalSymlinks(super.SourcePath) + if err != nil { + return err + } + + super.tempdir, err = ioutil.TempDir("", "arvados-server-boot-") + if err != nil { + return err + } + defer os.RemoveAll(super.tempdir) + if err := os.Mkdir(filepath.Join(super.tempdir, "bin"), 0777); err != nil { + return err + } + + // Fill in any missing config keys, and write the resulting + // config in the temp dir for child services to use. + err = super.autofillConfig(cfg, super.logger) + if err != nil { + return err + } + conffile, err := os.OpenFile(filepath.Join(super.tempdir, "config.yml"), os.O_CREATE|os.O_WRONLY, 0777) + if err != nil { + return err + } + defer conffile.Close() + err = json.NewEncoder(conffile).Encode(cfg) + if err != nil { + return err + } + err = conffile.Close() + if err != nil { + return err + } + super.configfile = conffile.Name() + + super.environ = os.Environ() + super.cleanEnv() + super.setEnv("ARVADOS_CONFIG", super.configfile) + super.setEnv("RAILS_ENV", super.ClusterType) + super.setEnv("TMPDIR", super.tempdir) + super.prependEnv("PATH", filepath.Join(super.tempdir, "bin")+":") + super.prependEnv("PATH", filepath.Join(super.LibPath, "bin")+":") + + super.cluster, err = cfg.GetCluster("") + if err != nil { + return err + } + // Now that we have the config, replace the bootstrap logger + // with a new one according to the logging config. + loglevel := super.cluster.SystemLogs.LogLevel + if s := os.Getenv("ARVADOS_DEBUG"); s != "" && s != "0" { + loglevel = "debug" + } + super.logger = ctxlog.New(super.Stderr, super.cluster.SystemLogs.Format, loglevel).WithFields(logrus.Fields{ + "PID": os.Getpid(), + }) + + if super.SourceVersion == "" { + // Find current source tree version. + var buf bytes.Buffer + err = super.RunProgram(super.ctx, ".", &buf, nil, "git", "diff", "--shortstat") + if err != nil { + return err + } + dirty := buf.Len() > 0 + buf.Reset() + err = super.RunProgram(super.ctx, ".", &buf, nil, "git", "log", "-n1", "--format=%H") + if err != nil { + return err + } + super.SourceVersion = strings.TrimSpace(buf.String()) + if dirty { + super.SourceVersion += "+uncommitted" + } + } else { + return errors.New("specifying a version to run is not yet supported") + } + for _, dir := range []string{super.LibPath, filepath.Join(super.LibPath, "bin")} { + if _, err = os.Stat(filepath.Join(dir, ".")); os.IsNotExist(err) { + err = os.Mkdir(dir, 0755) + if err != nil { + return err + } + } else if err != nil { + return err + } + } + _, err = super.installGoProgram(super.ctx, "cmd/arvados-server") + if err != nil { + return err + } + err = super.setupRubyEnv() + if err != nil { + return err + } + + tasks := []supervisedTask{ + createCertificates{}, + runPostgreSQL{}, + runNginx{}, + runServiceCommand{name: "controller", svc: super.cluster.Services.Controller, depends: []supervisedTask{runPostgreSQL{}}}, + runGoProgram{src: "services/arv-git-httpd", svc: super.cluster.Services.GitHTTP}, + runGoProgram{src: "services/health", svc: super.cluster.Services.Health}, + runGoProgram{src: "services/keepproxy", svc: super.cluster.Services.Keepproxy, depends: []supervisedTask{runPassenger{src: "services/api"}}}, + runGoProgram{src: "services/keepstore", svc: super.cluster.Services.Keepstore}, + runGoProgram{src: "services/keep-web", svc: super.cluster.Services.WebDAV}, + runGoProgram{src: "services/ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{runPostgreSQL{}}}, + installPassenger{src: "services/api"}, + runPassenger{src: "services/api", svc: super.cluster.Services.RailsAPI, depends: []supervisedTask{createCertificates{}, runPostgreSQL{}, installPassenger{src: "services/api"}}}, + installPassenger{src: "apps/workbench", depends: []supervisedTask{installPassenger{src: "services/api"}}}, // dependency ensures workbench doesn't delay api startup + runPassenger{src: "apps/workbench", svc: super.cluster.Services.Workbench1, depends: []supervisedTask{installPassenger{src: "apps/workbench"}}}, + seedDatabase{}, + } + if super.ClusterType != "test" { + tasks = append(tasks, + runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.Controller}, + runGoProgram{src: "services/keep-balance"}, + ) + } + super.tasksReady = map[string]chan bool{} + for _, task := range tasks { + super.tasksReady[task.String()] = make(chan bool) + } + for _, task := range tasks { + task := task + fail := func(err error) { + if super.ctx.Err() != nil { + return + } + super.cancel() + super.logger.WithField("task", task.String()).WithError(err).Error("task failed") + } + go func() { + super.logger.WithField("task", task.String()).Info("starting") + err := task.Run(super.ctx, fail, super) + if err != nil { + fail(err) + return + } + close(super.tasksReady[task.String()]) + }() + } + err = super.wait(super.ctx, tasks...) + if err != nil { + return err + } + super.logger.Info("all startup tasks are complete; starting health checks") + super.healthChecker = &health.Aggregator{Cluster: super.cluster} + <-super.ctx.Done() + super.logger.Info("shutting down") + super.waitShutdown.Wait() + return super.ctx.Err() +} + +func (super *Supervisor) wait(ctx context.Context, tasks ...supervisedTask) error { + for _, task := range tasks { + ch, ok := super.tasksReady[task.String()] + if !ok { + return fmt.Errorf("no such task: %s", task) + } + super.logger.WithField("task", task.String()).Info("waiting") + select { + case <-ch: + super.logger.WithField("task", task.String()).Info("ready") + case <-ctx.Done(): + super.logger.WithField("task", task.String()).Info("task was never ready") + return ctx.Err() + } + } + return nil +} + +func (super *Supervisor) Stop() { + super.cancel() + <-super.done +} + +func (super *Supervisor) WaitReady() (*arvados.URL, bool) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for waiting := "all"; waiting != ""; { + select { + case <-ticker.C: + case <-super.ctx.Done(): + return nil, false + } + if super.healthChecker == nil { + // not set up yet + continue + } + resp := super.healthChecker.ClusterHealth() + // The overall health check (resp.Health=="OK") might + // never pass due to missing components (like + // arvados-dispatch-cloud in a test cluster), so + // instead we wait for all configured components to + // pass. + waiting = "" + for target, check := range resp.Checks { + if check.Health != "OK" { + waiting += " " + target + } + } + if waiting != "" { + super.logger.WithField("targets", waiting[1:]).Info("waiting") + } + } + u := super.cluster.Services.Controller.ExternalURL + return &u, true +} + +func (super *Supervisor) prependEnv(key, prepend string) { + for i, s := range super.environ { + if strings.HasPrefix(s, key+"=") { + super.environ[i] = key + "=" + prepend + s[len(key)+1:] + return + } + } + super.environ = append(super.environ, key+"="+prepend) +} + +var cleanEnvPrefixes = []string{ + "GEM_HOME=", + "GEM_PATH=", + "ARVADOS_", +} + +func (super *Supervisor) cleanEnv() { + var cleaned []string + for _, s := range super.environ { + drop := false + for _, p := range cleanEnvPrefixes { + if strings.HasPrefix(s, p) { + drop = true + break + } + } + if !drop { + cleaned = append(cleaned, s) + } + } + super.environ = cleaned +} + +func (super *Supervisor) setEnv(key, val string) { + for i, s := range super.environ { + if strings.HasPrefix(s, key+"=") { + super.environ[i] = key + "=" + val + return + } + } + super.environ = append(super.environ, key+"="+val) +} + +// Remove all but the first occurrence of each env var. +func dedupEnv(in []string) []string { + saw := map[string]bool{} + var out []string + for _, kv := range in { + if split := strings.Index(kv, "="); split < 1 { + panic("invalid environment var: " + kv) + } else if saw[kv[:split]] { + continue + } else { + saw[kv[:split]] = true + out = append(out, kv) + } + } + return out +} + +func (super *Supervisor) installGoProgram(ctx context.Context, srcpath string) (string, error) { + _, basename := filepath.Split(srcpath) + bindir := filepath.Join(super.tempdir, "bin") + binfile := filepath.Join(bindir, basename) + err := super.RunProgram(ctx, filepath.Join(super.SourcePath, srcpath), nil, []string{"GOBIN=" + bindir}, "go", "install", "-ldflags", "-X git.arvados.org/arvados.git/lib/cmd.version="+super.SourceVersion+" -X main.version="+super.SourceVersion) + return binfile, err +} + +func (super *Supervisor) setupRubyEnv() error { + cmd := exec.Command("gem", "env", "gempath") + cmd.Env = super.environ + buf, err := cmd.Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:... + if err != nil || len(buf) == 0 { + return fmt.Errorf("gem env gempath: %v", err) + } + gempath := string(bytes.Split(buf, []byte{':'})[0]) + super.prependEnv("PATH", gempath+"/bin:") + super.setEnv("GEM_HOME", gempath) + super.setEnv("GEM_PATH", gempath) + // Passenger install doesn't work unless $HOME is ~user + u, err := user.Current() + if err != nil { + return err + } + super.setEnv("HOME", u.HomeDir) + return nil +} + +func (super *Supervisor) lookPath(prog string) string { + for _, val := range super.environ { + if strings.HasPrefix(val, "PATH=") { + for _, dir := range filepath.SplitList(val[5:]) { + path := filepath.Join(dir, prog) + if fi, err := os.Stat(path); err == nil && fi.Mode()&0111 != 0 { + return path + } + } + } + } + return prog +} + +// Run prog with args, using dir as working directory. If ctx is +// cancelled while the child is running, RunProgram terminates the +// child, waits for it to exit, then returns. +// +// Child's environment will have our env vars, plus any given in env. +// +// Child's stdout will be written to output if non-nil, otherwise the +// boot command's stderr. +func (super *Supervisor) RunProgram(ctx context.Context, dir string, output io.Writer, env []string, prog string, args ...string) error { + cmdline := fmt.Sprintf("%s", append([]string{prog}, args...)) + super.logger.WithField("command", cmdline).WithField("dir", dir).Info("executing") + + logprefix := strings.TrimPrefix(prog, super.tempdir+"/bin/") + if prog == "bundle" && len(args) > 2 && args[0] == "exec" { + logprefix = args[1] + } else if prog == "arvados-server" && len(args) > 1 { + logprefix = args[0] + } + if !strings.HasPrefix(dir, "/") { + logprefix = dir + ": " + logprefix + } + + cmd := exec.Command(super.lookPath(prog), args...) + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + stderr, err := cmd.StderrPipe() + if err != nil { + return err + } + logwriter := &service.LogPrefixer{Writer: super.Stderr, Prefix: []byte("[" + logprefix + "] ")} + var copiers sync.WaitGroup + copiers.Add(1) + go func() { + io.Copy(logwriter, stderr) + copiers.Done() + }() + copiers.Add(1) + go func() { + if output == nil { + io.Copy(logwriter, stdout) + } else { + io.Copy(output, stdout) + } + copiers.Done() + }() + + if strings.HasPrefix(dir, "/") { + cmd.Dir = dir + } else { + cmd.Dir = filepath.Join(super.SourcePath, dir) + } + env = append([]string(nil), env...) + env = append(env, super.environ...) + cmd.Env = dedupEnv(env) + + exited := false + defer func() { exited = true }() + go func() { + <-ctx.Done() + log := ctxlog.FromContext(ctx).WithFields(logrus.Fields{"dir": dir, "cmdline": cmdline}) + for !exited { + if cmd.Process == nil { + log.Debug("waiting for child process to start") + time.Sleep(time.Second / 2) + } else { + log.WithField("PID", cmd.Process.Pid).Debug("sending SIGTERM") + cmd.Process.Signal(syscall.SIGTERM) + time.Sleep(5 * time.Second) + if !exited { + stdout.Close() + stderr.Close() + log.WithField("PID", cmd.Process.Pid).Warn("still waiting for child process to exit 5s after SIGTERM") + } + } + } + }() + + err = cmd.Start() + if err != nil { + return err + } + copiers.Wait() + err = cmd.Wait() + if ctx.Err() != nil { + // Return "context canceled", instead of the "killed" + // error that was probably caused by the context being + // canceled. + return ctx.Err() + } else if err != nil { + return fmt.Errorf("%s: error: %v", cmdline, err) + } + return nil +} + +func (super *Supervisor) autofillConfig(cfg *arvados.Config, log logrus.FieldLogger) error { + cluster, err := cfg.GetCluster("") + if err != nil { + return err + } + usedPort := map[string]bool{} + nextPort := func() string { + for { + port, err := availablePort(super.ListenHost + ":0") + if err != nil { + panic(err) + } + if usedPort[port] { + continue + } + usedPort[port] = true + return port + } + } + if cluster.Services.Controller.ExternalURL.Host == "" { + h, p, err := net.SplitHostPort(super.ControllerAddr) + if err != nil { + return err + } + if h == "" { + h = super.ListenHost + } + if p == "0" { + p, err = availablePort(":0") + if err != nil { + return err + } + usedPort[p] = true + } + cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p)} + } + for _, svc := range []*arvados.Service{ + &cluster.Services.Controller, + &cluster.Services.DispatchCloud, + &cluster.Services.GitHTTP, + &cluster.Services.Health, + &cluster.Services.Keepproxy, + &cluster.Services.Keepstore, + &cluster.Services.RailsAPI, + &cluster.Services.WebDAV, + &cluster.Services.WebDAVDownload, + &cluster.Services.Websocket, + &cluster.Services.Workbench1, + } { + if svc == &cluster.Services.DispatchCloud && super.ClusterType == "test" { + continue + } + if svc.ExternalURL.Host == "" && (svc == &cluster.Services.Controller || + svc == &cluster.Services.GitHTTP || + svc == &cluster.Services.Keepproxy || + svc == &cluster.Services.WebDAV || + svc == &cluster.Services.WebDAVDownload || + svc == &cluster.Services.Websocket || + svc == &cluster.Services.Workbench1) { + svc.ExternalURL = arvados.URL{Scheme: "https", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort())} + } + if len(svc.InternalURLs) == 0 { + svc.InternalURLs = map[arvados.URL]arvados.ServiceInstance{ + arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort())}: arvados.ServiceInstance{}, + } + } + } + if cluster.SystemRootToken == "" { + cluster.SystemRootToken = randomHexString(64) + } + if cluster.ManagementToken == "" { + cluster.ManagementToken = randomHexString(64) + } + if cluster.API.RailsSessionSecretToken == "" { + cluster.API.RailsSessionSecretToken = randomHexString(64) + } + if cluster.Collections.BlobSigningKey == "" { + cluster.Collections.BlobSigningKey = randomHexString(64) + } + if super.ClusterType != "production" && cluster.Containers.DispatchPrivateKey == "" { + buf, err := ioutil.ReadFile(filepath.Join(super.SourcePath, "lib", "dispatchcloud", "test", "sshkey_dispatch")) + if err != nil { + return err + } + cluster.Containers.DispatchPrivateKey = string(buf) + } + if super.ClusterType != "production" { + cluster.TLS.Insecure = true + } + if super.ClusterType == "test" { + // Add a second keepstore process. + cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort())}] = arvados.ServiceInstance{} + + // Create a directory-backed volume for each keepstore + // process. + cluster.Volumes = map[string]arvados.Volume{} + for url := range cluster.Services.Keepstore.InternalURLs { + volnum := len(cluster.Volumes) + datadir := fmt.Sprintf("%s/keep%d.data", super.tempdir, volnum) + if _, err = os.Stat(datadir + "/."); err == nil { + } else if !os.IsNotExist(err) { + return err + } else if err = os.Mkdir(datadir, 0777); err != nil { + return err + } + cluster.Volumes[fmt.Sprintf(cluster.ClusterID+"-nyw5e-%015d", volnum)] = arvados.Volume{ + Driver: "Directory", + DriverParameters: json.RawMessage(fmt.Sprintf(`{"Root":%q}`, datadir)), + AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{ + url: {}, + }, + } + } + } + if super.OwnTemporaryDatabase { + cluster.PostgreSQL.Connection = arvados.PostgreSQLConnection{ + "client_encoding": "utf8", + "host": "localhost", + "port": nextPort(), + "dbname": "arvados_test", + "user": "arvados", + "password": "insecure_arvados_test", + } + } + + cfg.Clusters[cluster.ClusterID] = *cluster + return nil +} + +func randomHexString(chars int) string { + b := make([]byte, chars/2) + _, err := rand.Read(b) + if err != nil { + panic(err) + } + return fmt.Sprintf("%x", b) +} + +func internalPort(svc arvados.Service) (string, error) { + for u := range svc.InternalURLs { + if _, p, err := net.SplitHostPort(u.Host); err != nil { + return "", err + } else if p != "" { + return p, nil + } else if u.Scheme == "https" { + return "443", nil + } else { + return "80", nil + } + } + return "", fmt.Errorf("service has no InternalURLs") +} + +func externalPort(svc arvados.Service) (string, error) { + if _, p, err := net.SplitHostPort(svc.ExternalURL.Host); err != nil { + return "", err + } else if p != "" { + return p, nil + } else if svc.ExternalURL.Scheme == "https" { + return "443", nil + } else { + return "80", nil + } +} + +func availablePort(addr string) (string, error) { + ln, err := net.Listen("tcp", addr) + if err != nil { + return "", err + } + defer ln.Close() + _, port, err := net.SplitHostPort(ln.Addr().String()) + if err != nil { + return "", err + } + return port, nil +} + +// Try to connect to addr until it works, then close ch. Give up if +// ctx cancels. +func waitForConnect(ctx context.Context, addr string) error { + dialer := net.Dialer{Timeout: time.Second} + for ctx.Err() == nil { + conn, err := dialer.DialContext(ctx, "tcp", addr) + if err != nil { + time.Sleep(time.Second / 10) + continue + } + conn.Close() + return nil + } + return ctx.Err() +} diff --git a/lib/controller/integration_test.go b/lib/controller/integration_test.go index e679106dce..e308f8726b 100644 --- a/lib/controller/integration_test.go +++ b/lib/controller/integration_test.go @@ -27,7 +27,7 @@ import ( var _ = check.Suite(&IntegrationSuite{}) type testCluster struct { - booter boot.Booter + super boot.Supervisor config arvados.Config controllerURL *url.URL } @@ -102,7 +102,7 @@ func (s *IntegrationSuite) SetUpSuite(c *check.C) { cfg, err := loader.Load() c.Assert(err, check.IsNil) s.testClusters[id] = &testCluster{ - booter: boot.Booter{ + super: boot.Supervisor{ SourcePath: filepath.Join(cwd, "..", ".."), LibPath: filepath.Join(cwd, "..", "..", "tmp"), ClusterType: "test", @@ -113,10 +113,10 @@ func (s *IntegrationSuite) SetUpSuite(c *check.C) { }, config: *cfg, } - s.testClusters[id].booter.Start(context.Background(), &s.testClusters[id].config) + s.testClusters[id].super.Start(context.Background(), &s.testClusters[id].config) } for _, tc := range s.testClusters { - au, ok := tc.booter.WaitReady() + au, ok := tc.super.WaitReady() c.Assert(ok, check.Equals, true) u := url.URL(*au) tc.controllerURL = &u @@ -125,7 +125,7 @@ func (s *IntegrationSuite) SetUpSuite(c *check.C) { func (s *IntegrationSuite) TearDownSuite(c *check.C) { for _, c := range s.testClusters { - c.booter.Stop() + c.super.Stop() } } diff --git a/lib/service/cmd.go b/lib/service/cmd.go index 48912b8898..7f2f78ee9a 100644 --- a/lib/service/cmd.go +++ b/lib/service/cmd.go @@ -59,7 +59,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout var err error defer func() { if err != nil { - log.WithError(err).Info("exiting") + log.WithError(err).Error("exiting") } }() -- 2.30.2