"path/filepath"
)
-func createCertificates(ctx context.Context, boot *Booter, ready chan<- bool) error {
+type createCertificates struct{}
+
+func (createCertificates) String() string {
+ return "certificates"
+}
+
+func (createCertificates) Run(ctx context.Context, fail func(error), boot *Booter) error {
// Generate root key
err := boot.RunProgram(ctx, boot.tempdir, nil, nil, "openssl", "genrsa", "-out", "rootCA.key", "4096")
if err != nil {
if err != nil {
return err
}
-
- close(ready)
- <-ctx.Done()
return nil
}
var Command cmd.Handler = bootCommand{}
+type bootTask interface {
+ // Execute the task. Run should return nil when the task is
+ // done enough to satisfy a dependency relationship (e.g., the
+ // service is running and ready). If the task starts a
+ // goroutine that fails after Run returns (e.g., the service
+ // shuts down), it should call cancel.
+ Run(ctx context.Context, fail func(error), boot *Booter) error
+ String() string
+}
+
type bootCommand struct{}
func (bootCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
cancel context.CancelFunc
done chan struct{}
healthChecker *health.Aggregator
+ tasksReady map[string]chan bool
tempdir string
configfile string
return err
}
- var wg sync.WaitGroup
- components := map[string]*component{
- "certificates": &component{runFunc: createCertificates},
- "database": &component{runFunc: runPostgres, depends: []string{"certificates"}},
- "nginx": &component{runFunc: runNginx},
- "controller": &component{cmdHandler: controller.Command, depends: []string{"database"}},
- "dispatchcloud": &component{cmdHandler: dispatchcloud.Command, notIfTest: true},
- "git-httpd": &component{goProg: "services/arv-git-httpd"},
- "health": &component{goProg: "services/health"},
- "keep-balance": &component{goProg: "services/keep-balance", notIfTest: true},
- "keepproxy": &component{goProg: "services/keepproxy"},
- "keepstore": &component{goProg: "services/keepstore", svc: boot.cluster.Services.Keepstore},
- "keep-web": &component{goProg: "services/keep-web"},
- "railsAPI": &component{svc: boot.cluster.Services.RailsAPI, railsApp: "services/api", depends: []string{"database"}},
- "workbench1": &component{svc: boot.cluster.Services.Workbench1, railsApp: "apps/workbench"},
- "ws": &component{goProg: "services/ws", depends: []string{"database"}},
- }
- for _, cmpt := range components {
- cmpt.ready = make(chan bool)
- }
- for name, cmpt := range components {
- name, cmpt := name, cmpt
- wg.Add(1)
- go func() {
- defer wg.Done()
- defer boot.cancel()
- for _, dep := range cmpt.depends {
- boot.logger.WithField("component", name).WithField("dependency", dep).Info("waiting")
- select {
- case <-components[dep].ready:
- case <-boot.ctx.Done():
- return
- }
+ tasks := []bootTask{
+ createCertificates{},
+ runPostgreSQL{},
+ runNginx{},
+ runServiceCommand{name: "controller", command: controller.Command, depends: []bootTask{runPostgreSQL{}}},
+ runGoProgram{src: "services/arv-git-httpd"},
+ runGoProgram{src: "services/health"},
+ runGoProgram{src: "services/keepproxy"},
+ runGoProgram{src: "services/keepstore", svc: boot.cluster.Services.Keepstore},
+ runGoProgram{src: "services/keep-web"},
+ runGoProgram{src: "services/ws", depends: []bootTask{runPostgreSQL{}}},
+ installPassenger{src: "services/api"},
+ runPassenger{src: "services/api", svc: boot.cluster.Services.RailsAPI, depends: []bootTask{createCertificates{}, runPostgreSQL{}, installPassenger{src: "services/api"}}},
+ installPassenger{src: "apps/workbench"},
+ runPassenger{src: "apps/workbench", svc: boot.cluster.Services.Workbench1, depends: []bootTask{installPassenger{src: "apps/workbench"}}},
+ seedDatabase{},
+ }
+ if boot.ClusterType != "test" {
+ tasks = append(tasks,
+ runServiceCommand{name: "dispatchcloud", command: dispatchcloud.Command},
+ runGoProgram{src: "services/keep-balance"},
+ )
+ }
+ boot.tasksReady = map[string]chan bool{}
+ for _, task := range tasks {
+ boot.tasksReady[task.String()] = make(chan bool)
+ }
+ for _, task := range tasks {
+ task := task
+ fail := func(err error) {
+ if boot.ctx.Err() != nil {
+ return
}
- boot.logger.WithField("component", name).Info("starting")
- err := cmpt.Run(boot.ctx, name, boot)
- if err != nil && err != context.Canceled {
- boot.logger.WithError(err).WithField("component", name).Error("exited")
+ boot.cancel()
+ boot.logger.WithField("task", task).WithError(err).Error("task failed")
+ }
+ go func() {
+ boot.logger.WithField("task", task).Info("starting")
+ err := task.Run(boot.ctx, fail, boot)
+ if err != nil {
+ fail(err)
+ return
}
+ close(boot.tasksReady[task.String()])
}()
}
- wg.Wait()
+ return boot.wait(boot.ctx, tasks...)
+}
+
+func (boot *Booter) wait(ctx context.Context, tasks ...bootTask) error {
+ for _, task := range tasks {
+ ch, ok := boot.tasksReady[task.String()]
+ if !ok {
+ return fmt.Errorf("no such task: %s", task)
+ }
+ boot.logger.WithField("task", task).Info("waiting")
+ select {
+ case <-ch:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
return nil
}
return nil
}
-type component struct {
- name string
- svc arvados.Service
- cmdHandler cmd.Handler
- runFunc func(ctx context.Context, boot *Booter, ready chan<- bool) error
- railsApp string // source dir in arvados tree, e.g., "services/api"
- goProg string // source dir in arvados tree, e.g., "services/keepstore"
- notIfTest bool // don't run this component on a test cluster
- depends []string // don't start until all of these components are ready
-
- ready chan bool
-}
-
-func (cmpt *component) Run(ctx context.Context, name string, boot *Booter) error {
- if cmpt.notIfTest && boot.ClusterType == "test" {
- fmt.Fprintf(boot.Stderr, "skipping component %q in %s mode\n", name, boot.ClusterType)
- <-ctx.Done()
- return nil
- }
- fmt.Fprintf(boot.Stderr, "starting component %q\n", name)
- if cmpt.cmdHandler != nil {
- errs := make(chan error, 1)
- go func() {
- defer close(errs)
- exitcode := cmpt.cmdHandler.RunCommand(name, []string{"-config", boot.configfile}, bytes.NewBuffer(nil), boot.Stderr, boot.Stderr)
- if exitcode != 0 {
- errs <- fmt.Errorf("exit code %d", exitcode)
- }
- }()
- select {
- case err := <-errs:
- return err
- case <-ctx.Done():
- // cmpt.cmdHandler.RunCommand() doesn't have
- // access to our context, so it won't shut
- // down by itself. We just abandon it.
- return nil
- }
- }
- if cmpt.goProg != "" {
- boot.RunProgram(ctx, cmpt.goProg, nil, nil, "go", "install")
- if ctx.Err() != nil {
- return nil
- }
- _, basename := filepath.Split(cmpt.goProg)
- if len(cmpt.svc.InternalURLs) > 0 {
- // Run one for each URL
- var wg sync.WaitGroup
- for u := range cmpt.svc.InternalURLs {
- u := u
- wg.Add(1)
- go func() {
- defer wg.Done()
- boot.RunProgram(ctx, boot.tempdir, nil, []string{"ARVADOS_SERVICE_INTERNAL_URL=" + u.String()}, basename)
- }()
- }
- wg.Wait()
- } else {
- // Just run one
- boot.RunProgram(ctx, boot.tempdir, nil, nil, basename)
- }
- return nil
- }
- if cmpt.runFunc != nil {
- return cmpt.runFunc(ctx, boot, cmpt.ready)
- }
- if cmpt.railsApp != "" {
- port, err := internalPort(cmpt.svc)
- if err != nil {
- return fmt.Errorf("bug: no InternalURLs for component %q: %v", name, cmpt.svc.InternalURLs)
- }
- var buf bytes.Buffer
- err = boot.RunProgram(ctx, cmpt.railsApp, &buf, nil, "gem", "list", "--details", "bundler")
- if err != nil {
- return err
- }
- for _, version := range []string{"1.11.0", "1.17.3", "2.0.2"} {
- if !strings.Contains(buf.String(), "("+version+")") {
- err = boot.RunProgram(ctx, cmpt.railsApp, nil, nil, "gem", "install", "--user", "bundler:1.11", "bundler:1.17.3", "bundler:2.0.2")
- if err != nil {
- return err
- }
- break
- }
- }
- err = boot.RunProgram(ctx, cmpt.railsApp, nil, nil, "bundle", "install", "--jobs", "4", "--path", filepath.Join(os.Getenv("HOME"), ".gem"))
- if err != nil {
- return err
- }
- err = boot.RunProgram(ctx, cmpt.railsApp, nil, nil, "bundle", "exec", "passenger-config", "build-native-support")
- if err != nil {
- return err
- }
- err = boot.RunProgram(ctx, cmpt.railsApp, nil, nil, "bundle", "exec", "passenger-config", "install-standalone-runtime")
- if err != nil {
- return err
- }
- err = boot.RunProgram(ctx, cmpt.railsApp, nil, nil, "bundle", "exec", "passenger-config", "validate-install")
- if err != nil {
- return err
- }
- err = boot.RunProgram(ctx, cmpt.railsApp, nil, nil, "bundle", "exec", "passenger", "start", "-p", port)
- if err != nil {
- return err
- }
- return nil
- }
- return fmt.Errorf("bug: component %q has nothing to run", name)
-}
-
func (boot *Booter) autofillConfig(cfg *arvados.Config, log logrus.FieldLogger) error {
cluster, err := cfg.GetCluster("")
if err != nil {
// Try to connect to addr until it works, then close ch. Give up if
// ctx cancels.
-func connectAndClose(ctx context.Context, addr string, ch chan<- bool) {
+func waitForConnect(ctx context.Context, addr string) error {
dialer := net.Dialer{Timeout: time.Second}
for ctx.Err() == nil {
conn, err := dialer.DialContext(ctx, "tcp", addr)
continue
}
conn.Close()
- close(ch)
- return
+ return nil
}
+ return ctx.Err()
}
"git.arvados.org/arvados.git/sdk/go/arvados"
)
-func runNginx(ctx context.Context, boot *Booter, ready chan<- bool) error {
+type runNginx struct{}
+
+func (runNginx) String() string {
+ return "nginx"
+}
+
+func (runNginx) Run(ctx context.Context, fail func(error), boot *Booter) error {
vars := map[string]string{
"SSLCERT": filepath.Join(boot.SourcePath, "services", "api", "tmp", "self-signed.pem"), // TODO: root ca
"SSLKEY": filepath.Join(boot.SourcePath, "services", "api", "tmp", "self-signed.key"), // TODO: root ca
}
}
}
- go connectAndClose(ctx, boot.cluster.Services.Controller.ExternalURL.Host, ready)
- return boot.RunProgram(ctx, ".", nil, nil, nginx,
- "-g", "error_log stderr info;",
- "-g", "pid "+filepath.Join(boot.tempdir, "nginx.pid")+";",
- "-c", conffile)
+ go func() {
+ fail(boot.RunProgram(ctx, ".", nil, nil, nginx,
+ "-g", "error_log stderr info;",
+ "-g", "pid "+filepath.Join(boot.tempdir, "nginx.pid")+";",
+ "-c", conffile))
+ }()
+ return waitForConnect(ctx, boot.cluster.Services.Controller.ExternalURL.Host)
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package boot
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+)
+
+type installPassenger struct {
+ src string
+ depends []bootTask
+}
+
+func (runner installPassenger) String() string {
+ return "install " + runner.src
+}
+
+func (runner installPassenger) Run(ctx context.Context, fail func(error), boot *Booter) error {
+ err := boot.wait(ctx, runner.depends...)
+ if err != nil {
+ return err
+ }
+ var buf bytes.Buffer
+ err = boot.RunProgram(ctx, runner.src, &buf, nil, "gem", "list", "--details", "bundler")
+ if err != nil {
+ return err
+ }
+ for _, version := range []string{"1.11.0", "1.17.3", "2.0.2"} {
+ if !strings.Contains(buf.String(), "("+version+")") {
+ err = boot.RunProgram(ctx, runner.src, nil, nil, "gem", "install", "--user", "bundler:1.11", "bundler:1.17.3", "bundler:2.0.2")
+ if err != nil {
+ return err
+ }
+ break
+ }
+ }
+ err = boot.RunProgram(ctx, runner.src, nil, nil, "bundle", "install", "--jobs", "4", "--path", filepath.Join(os.Getenv("HOME"), ".gem"))
+ if err != nil {
+ return err
+ }
+ err = boot.RunProgram(ctx, runner.src, nil, nil, "bundle", "exec", "passenger-config", "build-native-support")
+ if err != nil {
+ return err
+ }
+ err = boot.RunProgram(ctx, runner.src, nil, nil, "bundle", "exec", "passenger-config", "install-standalone-runtime")
+ if err != nil {
+ return err
+ }
+ err = boot.RunProgram(ctx, runner.src, nil, nil, "bundle", "exec", "passenger-config", "validate-install")
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+type runPassenger struct {
+ src string
+ svc arvados.Service
+ depends []bootTask
+}
+
+func (runner runPassenger) String() string {
+ _, basename := filepath.Split(runner.src)
+ return basename
+}
+
+func (runner runPassenger) Run(ctx context.Context, fail func(error), boot *Booter) error {
+ err := boot.wait(ctx, runner.depends...)
+ if err != nil {
+ return err
+ }
+ port, err := internalPort(runner.svc)
+ if err != nil {
+ return fmt.Errorf("bug: no InternalURLs for component %q: %v", runner, runner.svc.InternalURLs)
+ }
+ err = boot.RunProgram(ctx, runner.src, nil, nil, "bundle", "exec", "passenger", "start", "-p", port)
+ if err != nil {
+ return err
+ }
+ return nil
+}
"bytes"
"context"
"database/sql"
+ "fmt"
"os"
"os/exec"
"path/filepath"
"github.com/lib/pq"
)
-func runPostgres(ctx context.Context, boot *Booter, ready chan<- bool) error {
+type runPostgreSQL struct{}
+
+func (runPostgreSQL) String() string {
+ return "postgresql"
+}
+
+func (runPostgreSQL) Run(ctx context.Context, fail func(error), boot *Booter) error {
+ err := boot.wait(ctx, createCertificates{})
+ if err != nil {
+ return err
+ }
+
buf := bytes.NewBuffer(nil)
- err := boot.RunProgram(ctx, boot.tempdir, buf, nil, "pg_config", "--bindir")
+ err = boot.RunProgram(ctx, boot.tempdir, buf, nil, "pg_config", "--bindir")
if err != nil {
return err
}
port := boot.cluster.PostgreSQL.Connection["port"]
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
-
go func() {
- for {
- if ctx.Err() != nil {
- return
- }
- if exec.CommandContext(ctx, "pg_isready", "--timeout=10", "--host="+boot.cluster.PostgreSQL.Connection["host"], "--port="+port).Run() == nil {
- break
- }
- time.Sleep(time.Second / 2)
- }
- db, err := sql.Open("postgres", arvados.PostgreSQLConnection{
- "host": datadir,
- "port": port,
- "dbname": "postgres",
- }.String())
- if err != nil {
- boot.logger.WithError(err).Error("db open failed")
- cancel()
- return
- }
- defer db.Close()
- conn, err := db.Conn(ctx)
- if err != nil {
- boot.logger.WithError(err).Error("db conn failed")
- cancel()
- return
- }
- defer conn.Close()
- _, err = conn.ExecContext(ctx, `CREATE USER `+pq.QuoteIdentifier(boot.cluster.PostgreSQL.Connection["user"])+` WITH SUPERUSER ENCRYPTED PASSWORD `+pq.QuoteLiteral(boot.cluster.PostgreSQL.Connection["password"]))
- if err != nil {
- boot.logger.WithError(err).Error("createuser failed")
- cancel()
- return
- }
- _, err = conn.ExecContext(ctx, `CREATE DATABASE `+pq.QuoteIdentifier(boot.cluster.PostgreSQL.Connection["dbname"]))
- if err != nil {
- boot.logger.WithError(err).Error("createdb failed")
- cancel()
- return
- }
- close(ready)
- return
+ fail(boot.RunProgram(ctx, boot.tempdir, nil, nil, filepath.Join(bindir, "postgres"),
+ "-l", // enable ssl
+ "-D", datadir, // data dir
+ "-k", datadir, // socket dir
+ "-p", boot.cluster.PostgreSQL.Connection["port"],
+ ))
}()
- return boot.RunProgram(ctx, boot.tempdir, nil, nil, filepath.Join(bindir, "postgres"),
- "-l", // enable ssl
- "-D", datadir, // data dir
- "-k", datadir, // socket dir
- "-p", boot.cluster.PostgreSQL.Connection["port"],
- )
+ for {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ if exec.CommandContext(ctx, "pg_isready", "--timeout=10", "--host="+boot.cluster.PostgreSQL.Connection["host"], "--port="+port).Run() == nil {
+ break
+ }
+ time.Sleep(time.Second / 2)
+ }
+ db, err := sql.Open("postgres", arvados.PostgreSQLConnection{
+ "host": datadir,
+ "port": port,
+ "dbname": "postgres",
+ }.String())
+ if err != nil {
+ return fmt.Errorf("db open failed: %s", err)
+ }
+ defer db.Close()
+ conn, err := db.Conn(ctx)
+ if err != nil {
+ return fmt.Errorf("db conn failed: %s", err)
+ }
+ defer conn.Close()
+ _, err = conn.ExecContext(ctx, `CREATE USER `+pq.QuoteIdentifier(boot.cluster.PostgreSQL.Connection["user"])+` WITH SUPERUSER ENCRYPTED PASSWORD `+pq.QuoteLiteral(boot.cluster.PostgreSQL.Connection["password"]))
+ if err != nil {
+ return fmt.Errorf("createuser failed: %s", err)
+ }
+ _, err = conn.ExecContext(ctx, `CREATE DATABASE `+pq.QuoteIdentifier(boot.cluster.PostgreSQL.Connection["dbname"]))
+ if err != nil {
+ return fmt.Errorf("createdb failed: %s", err)
+ }
+ return nil
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package boot
+
+import (
+ "context"
+)
+
+type seedDatabase struct{}
+
+func (seedDatabase) String() string {
+ return "seedDatabase"
+}
+
+func (seedDatabase) Run(ctx context.Context, fail func(error), boot *Booter) error {
+ err := boot.wait(ctx, runPostgreSQL{})
+ if err != nil {
+ return err
+ }
+ err = boot.RunProgram(ctx, "services/api", nil, nil, "bundle", "exec", "rake", "db:setup")
+ if err != nil {
+ return err
+ }
+ return nil
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package boot
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "path/filepath"
+
+ "git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+)
+
+type runServiceCommand struct {
+ name string
+ command cmd.Handler
+ depends []bootTask
+}
+
+func (runner runServiceCommand) String() string {
+ return runner.name
+}
+
+func (runner runServiceCommand) Run(ctx context.Context, fail func(error), boot *Booter) error {
+ boot.wait(ctx, runner.depends...)
+ go func() {
+ // runner.command.RunCommand() doesn't have access to
+ // ctx, so it can't shut down by itself when the
+ // caller cancels. We just abandon it.
+ exitcode := runner.command.RunCommand(runner.name, []string{"-config", boot.configfile}, bytes.NewBuffer(nil), boot.Stderr, boot.Stderr)
+ fail(fmt.Errorf("exit code %d", exitcode))
+ }()
+ return nil
+}
+
+type runGoProgram struct {
+ src string
+ svc arvados.Service
+ depends []bootTask
+}
+
+func (runner runGoProgram) String() string {
+ _, basename := filepath.Split(runner.src)
+ return basename
+}
+
+func (runner runGoProgram) Run(ctx context.Context, fail func(error), boot *Booter) error {
+ boot.wait(ctx, runner.depends...)
+ boot.RunProgram(ctx, runner.src, nil, nil, "go", "install")
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ _, basename := filepath.Split(runner.src)
+ if len(runner.svc.InternalURLs) > 0 {
+ // Run one for each URL
+ for u := range runner.svc.InternalURLs {
+ u := u
+ go func() {
+ fail(boot.RunProgram(ctx, boot.tempdir, nil, []string{"ARVADOS_SERVICE_INTERNAL_URL=" + u.String()}, basename))
+ }()
+ }
+ } else {
+ // Just run one
+ go func() {
+ fail(boot.RunProgram(ctx, boot.tempdir, nil, nil, basename))
+ }()
+ }
+ return nil
+}