18699: boot.Supervisor support for multi-cluster config file.
authorTom Clegg <tom@curii.com>
Mon, 4 Apr 2022 14:18:07 +0000 (10:18 -0400)
committerTom Clegg <tom@curii.com>
Wed, 13 Apr 2022 17:27:49 +0000 (13:27 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/boot/cert.go
lib/boot/cmd.go
lib/boot/helpers.go
lib/boot/supervisor.go
lib/controller/integration_test.go
tools/sync-groups/federation_test.go

index 2b38dab053cd63d571a00bf4d791f90b328c979c..916f9f53b2af7b9109474c3e662b395174f982b9 100644 (file)
@@ -26,20 +26,8 @@ func (createCertificates) String() string {
 }
 
 func (createCertificates) Run(ctx context.Context, fail func(error), super *Supervisor) error {
-       var san string
-       if net.ParseIP(super.ListenHost) != nil {
-               san += fmt.Sprintf(",IP:%s", super.ListenHost)
-       } else {
-               san += fmt.Sprintf(",DNS:%s", super.ListenHost)
-       }
-       hostname, err := os.Hostname()
-       if err != nil {
-               return fmt.Errorf("hostname: %w", err)
-       }
-       san += ",DNS:" + hostname
-
        // Generate root key
-       err = super.RunProgram(ctx, super.tempdir, runOptions{}, "openssl", "genrsa", "-out", "rootCA.key", "4096")
+       err := super.RunProgram(ctx, super.tempdir, runOptions{}, "openssl", "genrsa", "-out", "rootCA.key", "4096")
        if err != nil {
                return err
        }
@@ -58,7 +46,20 @@ func (createCertificates) Run(ctx context.Context, fail func(error), super *Supe
        if err != nil {
                return err
        }
-       err = ioutil.WriteFile(filepath.Join(super.tempdir, "server.cfg"), append(defaultconf, []byte(fmt.Sprintf("\n[SAN]\nsubjectAltName=DNS:localhost,DNS:localhost.localdomain%s\n", san))...), 0644)
+       hostname, err := os.Hostname()
+       if err != nil {
+               return fmt.Errorf("hostname: %w", err)
+       }
+       san := "DNS:localhost,DNS:localhost.localdomain,DNS:" + hostname
+       if super.ListenHost == hostname || super.ListenHost == "localhost" {
+               // already have it
+       } else if net.ParseIP(super.ListenHost) != nil {
+               san += fmt.Sprintf(",IP:%s", super.ListenHost)
+       } else {
+               san += fmt.Sprintf(",DNS:%s", super.ListenHost)
+       }
+       conf := append(defaultconf, []byte(fmt.Sprintf("\n[SAN]\nsubjectAltName=%s\n", san))...)
+       err = ioutil.WriteFile(filepath.Join(super.tempdir, "server.cfg"), conf, 0644)
        if err != nil {
                return err
        }
index 9c3047a7b6c1f80b911e6c19042d62cc967a80f1..6a32ab142de79115c136e3e674936041af390bc5 100644 (file)
@@ -13,7 +13,6 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
-       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
 )
 
@@ -56,17 +55,17 @@ func (bcmd bootCommand) run(ctx context.Context, prog string, args []string, std
        ctx, cancel := context.WithCancel(ctx)
        defer cancel()
        super := &Supervisor{
+               Stdin:  stdin,
                Stderr: stderr,
                logger: ctxlog.FromContext(ctx),
        }
 
        flags := flag.NewFlagSet(prog, flag.ContinueOnError)
-       loader := config.NewLoader(stdin, super.logger)
-       loader.SetupFlags(flags)
        versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
+       flags.StringVar(&super.ConfigPath, "config", "/etc/arvados/config.yml", "arvados config file `path`")
        flags.StringVar(&super.SourcePath, "source", ".", "arvados source tree `directory`")
        flags.StringVar(&super.ClusterType, "type", "production", "cluster `type`: development, test, or production")
-       flags.StringVar(&super.ListenHost, "listen-host", "localhost", "host name or interface address for service listeners")
+       flags.StringVar(&super.ListenHost, "listen-host", "localhost", "host name or interface address for external services, and internal services whose InternalURLs are not configured")
        flags.StringVar(&super.ControllerAddr, "controller-address", ":0", "desired controller address, `host:port` or `:port`")
        flags.StringVar(&super.Workbench2Source, "workbench2-source", "../arvados-workbench2", "path to arvados-workbench2 source tree")
        flags.BoolVar(&super.NoWorkbench1, "no-workbench1", false, "do not run workbench1")
@@ -87,13 +86,7 @@ func (bcmd bootCommand) run(ctx context.Context, prog string, args []string, std
                return fmt.Errorf("cluster type must be 'development', 'test', or 'production'")
        }
 
-       loader.SkipAPICalls = true
-       cfg, err := loader.Load()
-       if err != nil {
-               return err
-       }
-
-       super.Start(ctx, cfg, loader.Path)
+       super.Start(ctx)
        defer super.Stop()
 
        var timer *time.Timer
@@ -101,17 +94,21 @@ func (bcmd bootCommand) run(ctx context.Context, prog string, args []string, std
                timer = time.AfterFunc(*timeout, super.Stop)
        }
 
-       url, ok := super.WaitReady()
+       ok := super.WaitReady()
        if timer != nil && !timer.Stop() {
                return errors.New("boot timed out")
        } else if !ok {
                super.logger.Error("boot failed")
        } else {
-               // Write controller URL to stdout. Nothing else goes
-               // to stdout, so this provides an easy way for a
-               // calling script to discover the controller URL when
-               // everything is ready.
-               fmt.Fprintln(stdout, url)
+               // Write each cluster's controller URL to stdout.
+               // Nothing else goes to stdout, so this allows a
+               // calling script to determine when the cluster is
+               // ready to use, and the controller's host:port (which
+               // may have been dynamically assigned depending on
+               // config/options).
+               for _, cc := range super.Clusters() {
+                       fmt.Fprintln(stdout, cc.Services.Controller.ExternalURL)
+               }
                if *shutdown {
                        super.Stop()
                        // Wait for children to exit. Don't report the
index 731fc56c83ac582260b06fce87cf648dd7c5d686..77036e934017efd2f9cfd44dba23b36f8dcbc522 100644 (file)
@@ -9,73 +9,24 @@ import (
        "net/url"
 
        "git.arvados.org/arvados.git/lib/controller/rpc"
-       "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/auth"
-       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "gopkg.in/check.v1"
 )
 
-// TestCluster stores a working test cluster data
-type TestCluster struct {
-       Super         Supervisor
-       Config        arvados.Config
-       ControllerURL *url.URL
-       ClusterID     string
-}
-
-type logger struct {
-       loggerfunc func(...interface{})
-}
-
-func (l logger) Log(args ...interface{}) {
-       l.loggerfunc(args)
-}
-
-// NewTestCluster loads the provided configuration, and sets up a test cluster
-// ready for being started.
-func NewTestCluster(srcPath, clusterID string, cfg *arvados.Config, listenHost string, logWriter func(...interface{})) *TestCluster {
-       return &TestCluster{
-               Super: Supervisor{
-                       SourcePath:           srcPath,
-                       ClusterType:          "test",
-                       ListenHost:           listenHost,
-                       ControllerAddr:       ":0",
-                       OwnTemporaryDatabase: true,
-                       Stderr: &service.LogPrefixer{
-                               Writer: ctxlog.LogWriter(logWriter),
-                               Prefix: []byte("[" + clusterID + "] ")},
-               },
-               Config:    *cfg,
-               ClusterID: clusterID,
-       }
-}
-
-// Start the test cluster.
-func (tc *TestCluster) Start() {
-       tc.Super.Start(context.Background(), &tc.Config, "-")
-}
-
-// WaitReady waits for all components to report healthy, and finishes setting
-// up the TestCluster struct.
-func (tc *TestCluster) WaitReady() bool {
-       au, ok := tc.Super.WaitReady()
-       if !ok {
-               return ok
-       }
-       u := url.URL(*au)
-       tc.ControllerURL = &u
-       return ok
-}
-
 // ClientsWithToken returns Context, Arvados.Client and keepclient structs
 // initialized to connect to the cluster with the supplied Arvados token.
-func (tc *TestCluster) ClientsWithToken(token string) (context.Context, *arvados.Client, *keepclient.KeepClient) {
-       cl := tc.Config.Clusters[tc.ClusterID]
-       ctx := auth.NewContext(context.Background(), auth.NewCredentials(token))
-       ac, err := arvados.NewClientFromConfig(&cl)
+func (super *Supervisor) ClientsWithToken(clusterID, token string) (context.Context, *arvados.Client, *keepclient.KeepClient) {
+       cl := super.cluster
+       if super.children != nil {
+               cl = super.children[clusterID].cluster
+       } else if clusterID != cl.ClusterID {
+               panic("bad clusterID " + clusterID)
+       }
+       ctx := auth.NewContext(super.ctx, auth.NewCredentials(token))
+       ac, err := arvados.NewClientFromConfig(cl)
        if err != nil {
                panic(err)
        }
@@ -92,7 +43,7 @@ func (tc *TestCluster) ClientsWithToken(token string) (context.Context, *arvados
 // initialize clients with the API token, set up the user and
 // optionally activate the user.  Return client structs for
 // communicating with the cluster on behalf of the 'example' user.
-func (tc *TestCluster) UserClients(rootctx context.Context, c *check.C, conn *rpc.Conn, authEmail string, activate bool) (context.Context, *arvados.Client, *keepclient.KeepClient, arvados.User) {
+func (super *Supervisor) UserClients(clusterID string, rootctx context.Context, c *check.C, conn *rpc.Conn, authEmail string, activate bool) (context.Context, *arvados.Client, *keepclient.KeepClient, arvados.User) {
        login, err := conn.UserSessionCreate(rootctx, rpc.UserSessionCreateOptions{
                ReturnTo: ",https://example.com",
                AuthInfo: rpc.UserSessionAuthInfo{
@@ -107,7 +58,7 @@ func (tc *TestCluster) UserClients(rootctx context.Context, c *check.C, conn *rp
        c.Assert(err, check.IsNil)
        userToken := redirURL.Query().Get("api_token")
        c.Logf("user token: %q", userToken)
-       ctx, ac, kc := tc.ClientsWithToken(userToken)
+       ctx, ac, kc := super.ClientsWithToken(clusterID, userToken)
        user, err := conn.UserGetCurrent(ctx, arvados.GetOptions{})
        c.Assert(err, check.IsNil)
        _, err = conn.UserSetup(rootctx, arvados.UserSetupOptions{UUID: user.UUID})
@@ -127,18 +78,19 @@ func (tc *TestCluster) UserClients(rootctx context.Context, c *check.C, conn *rp
 
 // RootClients returns Context, arvados.Client and keepclient structs initialized
 // to communicate with the cluster as the system root user.
-func (tc *TestCluster) RootClients() (context.Context, *arvados.Client, *keepclient.KeepClient) {
-       return tc.ClientsWithToken(tc.Config.Clusters[tc.ClusterID].SystemRootToken)
+func (super *Supervisor) RootClients(clusterID string) (context.Context, *arvados.Client, *keepclient.KeepClient) {
+       return super.ClientsWithToken(clusterID, super.Cluster(clusterID).SystemRootToken)
 }
 
 // AnonymousClients returns Context, arvados.Client and keepclient structs initialized
 // to communicate with the cluster as the anonymous user.
-func (tc *TestCluster) AnonymousClients() (context.Context, *arvados.Client, *keepclient.KeepClient) {
-       return tc.ClientsWithToken(tc.Config.Clusters[tc.ClusterID].Users.AnonymousUserToken)
+func (super *Supervisor) AnonymousClients(clusterID string) (context.Context, *arvados.Client, *keepclient.KeepClient) {
+       return super.ClientsWithToken(clusterID, super.Cluster(clusterID).Users.AnonymousUserToken)
 }
 
 // Conn gets rpc connection struct initialized to communicate with the
 // specified cluster.
-func (tc *TestCluster) Conn() *rpc.Conn {
-       return rpc.NewConn(tc.ClusterID, tc.ControllerURL, true, rpc.PassthroughTokenProvider)
+func (super *Supervisor) Conn(clusterID string) *rpc.Conn {
+       controllerURL := url.URL(super.Cluster(clusterID).Services.Controller.ExternalURL)
+       return rpc.NewConn(clusterID, &controllerURL, true, rpc.PassthroughTokenProvider)
 }
index 7daceccb93d04e0498f0eabaecdaabeb7a045de3..b56e5ac725f773071b0643504bd4dbe4e6a0d811 100644 (file)
@@ -37,25 +37,47 @@ import (
 )
 
 type Supervisor struct {
-       SourcePath           string // e.g., /home/username/src/arvados
-       SourceVersion        string // e.g., acbd1324...
-       ClusterType          string // e.g., production
-       ListenHost           string // e.g., localhost
-       ControllerAddr       string // e.g., 127.0.0.1:8000
-       Workbench2Source     string // e.g., /home/username/src/arvados-workbench2
+       // Config file location like "/etc/arvados/config.yml", or "-"
+       // to read from Stdin (see below).
+       ConfigPath string
+       // Literal config file (useful for test suites). If non-empty,
+       // this is used instead of ConfigPath.
+       ConfigYAML string
+       // Path to arvados source tree. Only used for dev/test
+       // clusters.
+       SourcePath string
+       // Version number to build into binaries. Only used for
+       // dev/test clusters.
+       SourceVersion string
+       // "production", "development", or "test".
+       ClusterType string
+       // Listening address for external services, and internal
+       // services whose InternalURLs are not explicitly configured.
+       // If blank, listen on the configured controller ExternalURL
+       // host; if that is also blank, listen on all addresses
+       // (0.0.0.0).
+       ListenHost string
+       // Default host:port for controller ExternalURL if not
+       // explicitly configured in config file. If blank, use a
+       // random port on ListenHost.
+       ControllerAddr string
+       // Path to arvados-workbench2 source tree checkout.
+       Workbench2Source     string
        NoWorkbench1         bool
        NoWorkbench2         bool
        OwnTemporaryDatabase bool
+       Stdin                io.Reader
        Stderr               io.Writer
 
-       logger  logrus.FieldLogger
-       cluster *arvados.Cluster
+       logger   logrus.FieldLogger
+       cluster  *arvados.Cluster       // nil if this is a multi-cluster supervisor
+       children map[string]*Supervisor // nil if this is a single-cluster supervisor
 
        ctx           context.Context
        cancel        context.CancelFunc
-       done          chan struct{} // closed when child procs/services have shut down
-       err           error         // error that caused shutdown (valid when done is closed)
-       healthChecker *health.Aggregator
+       done          chan struct{}      // closed when child procs/services have shut down
+       err           error              // error that caused shutdown (valid when done is closed)
+       healthChecker *health.Aggregator // nil if this is a multi-cluster supervisor, or still booting
        tasksReady    map[string]chan bool
        waitShutdown  sync.WaitGroup
 
@@ -66,73 +88,161 @@ type Supervisor struct {
        environ    []string // for child processes
 }
 
-func (super *Supervisor) Cluster() *arvados.Cluster { return super.cluster }
+func (super *Supervisor) Clusters() map[string]*arvados.Cluster {
+       m := map[string]*arvados.Cluster{}
+       if super.cluster != nil {
+               m[super.cluster.ClusterID] = super.cluster
+       }
+       for id, super2 := range super.children {
+               m[id] = super2.Cluster("")
+       }
+       return m
+}
+
+func (super *Supervisor) Cluster(id string) *arvados.Cluster {
+       if super.children != nil {
+               return super.children[id].Cluster(id)
+       } else {
+               return super.cluster
+       }
+}
 
-func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config, cfgPath string) {
+func (super *Supervisor) Start(ctx context.Context) {
+       super.logger = ctxlog.FromContext(ctx)
        super.ctx, super.cancel = context.WithCancel(ctx)
        super.done = make(chan struct{})
 
+       sigch := make(chan os.Signal)
+       signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
+       defer signal.Stop(sigch)
        go func() {
-               defer close(super.done)
+               for sig := range sigch {
+                       super.logger.WithField("signal", sig).Info("caught signal")
+                       if super.err == nil {
+                               super.err = fmt.Errorf("caught signal %s", sig)
+                       }
+                       super.cancel()
+               }
+       }()
 
-               sigch := make(chan os.Signal)
-               signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
-               defer signal.Stop(sigch)
-               go func() {
-                       for sig := range sigch {
-                               super.logger.WithField("signal", sig).Info("caught signal")
-                               if super.err == nil {
-                                       super.err = fmt.Errorf("caught signal %s", sig)
-                               }
-                               super.cancel()
+       hupch := make(chan os.Signal)
+       signal.Notify(hupch, syscall.SIGHUP)
+       defer signal.Stop(hupch)
+       go func() {
+               for sig := range hupch {
+                       super.logger.WithField("signal", sig).Info("caught signal")
+                       if super.err == nil {
+                               super.err = errNeedConfigReload
                        }
-               }()
+                       super.cancel()
+               }
+       }()
+
+       loaderStdin := super.Stdin
+       if super.ConfigYAML != "" {
+               loaderStdin = bytes.NewBufferString(super.ConfigYAML)
+       }
+       loader := config.NewLoader(loaderStdin, super.logger)
+       loader.SkipLegacy = true
+       loader.SkipAPICalls = true
+       loader.Path = super.ConfigPath
+       if super.ConfigYAML != "" {
+               loader.Path = "-"
+       }
+       cfg, err := loader.Load()
+       if err != nil {
+               super.err = err
+               close(super.done)
+               super.cancel()
+               return
+       }
+
+       if super.ConfigPath != "" && super.ConfigPath != "-" && cfg.AutoReloadConfig {
+               go watchConfig(super.ctx, super.logger, super.ConfigPath, copyConfig(cfg), func() {
+                       if super.err == nil {
+                               super.err = errNeedConfigReload
+                       }
+                       super.cancel()
+               })
+       }
 
-               hupch := make(chan os.Signal)
-               signal.Notify(hupch, syscall.SIGHUP)
-               defer signal.Stop(hupch)
+       if len(cfg.Clusters) > 1 {
+               super.startFederation(cfg)
                go func() {
-                       for sig := range hupch {
-                               super.logger.WithField("signal", sig).Info("caught signal")
+                       defer super.cancel()
+                       defer close(super.done)
+                       for _, super2 := range super.children {
+                               err := super2.Wait()
                                if super.err == nil {
-                                       super.err = errNeedConfigReload
+                                       super.err = err
                                }
-                               super.cancel()
                        }
                }()
-
-               if cfgPath != "" && cfgPath != "-" && cfg.AutoReloadConfig {
-                       go watchConfig(super.ctx, super.logger, cfgPath, copyConfig(cfg), func() {
+       } else {
+               go func() {
+                       defer super.cancel()
+                       defer close(super.done)
+                       super.cluster, super.err = cfg.GetCluster("")
+                       if super.err != nil {
+                               return
+                       }
+                       err := super.runCluster()
+                       if err != nil {
+                               super.logger.WithError(err).Info("supervisor shut down")
                                if super.err == nil {
-                                       super.err = errNeedConfigReload
+                                       super.err = err
                                }
-                               super.cancel()
-                       })
-               }
-
-               err := super.run(cfg)
-               if err != nil {
-                       super.logger.WithError(err).Warn("supervisor shut down")
-                       if super.err == nil {
-                               super.err = err
                        }
-               }
-       }()
+               }()
+       }
 }
 
+// Wait returns when all child processes and goroutines have exited.
 func (super *Supervisor) Wait() error {
        <-super.done
        return super.err
 }
 
-func (super *Supervisor) run(cfg *arvados.Config) error {
-       defer super.cancel()
+// startFederation starts a child Supervisor for each cluster in the
+// given config. Each is a copy of the original/parent with the
+// original config reduced to a single cluster.
+func (super *Supervisor) startFederation(cfg *arvados.Config) {
+       super.children = map[string]*Supervisor{}
+       for id, cc := range cfg.Clusters {
+               super2 := *super
+               yaml, err := json.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{id: cc}})
+               if err != nil {
+                       panic(fmt.Sprintf("json.Marshal partial config: %s", err))
+               }
+               super2.ConfigYAML = string(yaml)
+               super2.ConfigPath = "-"
+               super2.children = nil
+
+               if super2.ClusterType == "test" {
+                       super2.Stderr = &service.LogPrefixer{
+                               Writer: super.Stderr,
+                               Prefix: []byte("[" + id + "] "),
+                       }
+               }
+               super2.Start(super.ctx)
+               super.children[id] = &super2
+       }
+}
 
+func (super *Supervisor) runCluster() error {
        cwd, err := os.Getwd()
        if err != nil {
                return err
        }
-       if !strings.HasPrefix(super.SourcePath, "/") {
+       if super.ClusterType == "test" && super.SourcePath == "" {
+               // When invoked by test suite, default to current
+               // source tree
+               buf, err := exec.Command("git", "rev-parse", "--show-toplevel").CombinedOutput()
+               if err != nil {
+                       return fmt.Errorf("git rev-parse: %w", err)
+               }
+               super.SourcePath = strings.TrimSuffix(string(buf), "\n")
+       } else if !strings.HasPrefix(super.SourcePath, "/") {
                super.SourcePath = filepath.Join(cwd, super.SourcePath)
        }
        super.SourcePath, err = filepath.EvalSymlinks(super.SourcePath)
@@ -140,6 +250,18 @@ func (super *Supervisor) run(cfg *arvados.Config) error {
                return err
        }
 
+       if super.ListenHost == "" {
+               if urlhost := super.cluster.Services.Controller.ExternalURL.Host; urlhost != "" {
+                       if h, _, _ := net.SplitHostPort(urlhost); h != "" {
+                               super.ListenHost = h
+                       } else {
+                               super.ListenHost = urlhost
+                       }
+               } else {
+                       super.ListenHost = "0.0.0.0"
+               }
+       }
+
        // Choose bin and temp dirs: /var/lib/arvados/... in
        // production, transient tempdir otherwise.
        if super.ClusterType == "production" {
@@ -164,7 +286,7 @@ func (super *Supervisor) run(cfg *arvados.Config) error {
 
        // Fill in any missing config keys, and write the resulting
        // config in the temp dir for child services to use.
-       err = super.autofillConfig(cfg)
+       err = super.autofillConfig()
        if err != nil {
                return err
        }
@@ -173,7 +295,9 @@ func (super *Supervisor) run(cfg *arvados.Config) error {
                return err
        }
        defer conffile.Close()
-       err = json.NewEncoder(conffile).Encode(cfg)
+       err = json.NewEncoder(conffile).Encode(arvados.Config{
+               Clusters: map[string]arvados.Cluster{
+                       super.cluster.ClusterID: *super.cluster}})
        if err != nil {
                return err
        }
@@ -193,10 +317,6 @@ func (super *Supervisor) run(cfg *arvados.Config) error {
                super.prependEnv("PATH", super.tempdir+"/bin:")
        }
 
-       super.cluster, err = cfg.GetCluster("")
-       if err != nil {
-               return err
-       }
        // Now that we have the config, replace the bootstrap logger
        // with a new one according to the logging config.
        loglevel := super.cluster.SystemLogs.LogLevel
@@ -307,36 +427,60 @@ func (super *Supervisor) run(cfg *arvados.Config) error {
 }
 
 func (super *Supervisor) wait(ctx context.Context, tasks ...supervisedTask) error {
+       ticker := time.NewTicker(15 * time.Second)
+       defer ticker.Stop()
        for _, task := range tasks {
                ch, ok := super.tasksReady[task.String()]
                if !ok {
                        return fmt.Errorf("no such task: %s", task)
                }
                super.logger.WithField("task", task.String()).Info("waiting")
-               select {
-               case <-ch:
-                       super.logger.WithField("task", task.String()).Info("ready")
-               case <-ctx.Done():
-                       super.logger.WithField("task", task.String()).Info("task was never ready")
-                       return ctx.Err()
+               for {
+                       select {
+                       case <-ch:
+                               super.logger.WithField("task", task.String()).Info("ready")
+                       case <-ctx.Done():
+                               super.logger.WithField("task", task.String()).Info("task was never ready")
+                               return ctx.Err()
+                       case <-ticker.C:
+                               super.logger.WithField("task", task.String()).Info("still waiting...")
+                               continue
+                       }
+                       break
                }
        }
        return nil
 }
 
+// Stop shuts down all child processes and goroutines, and returns
+// when all of them have exited.
 func (super *Supervisor) Stop() {
        super.cancel()
        <-super.done
 }
 
-func (super *Supervisor) WaitReady() (*arvados.URL, bool) {
+// WaitReady waits for the cluster(s) to be ready to handle requests,
+// then returns true. If startup fails, it returns false.
+func (super *Supervisor) WaitReady() bool {
+       if super.children != nil {
+               for id, super2 := range super.children {
+                       super.logger.Infof("waiting for %s to be ready", id)
+                       if !super2.WaitReady() {
+                               super.logger.Infof("%s startup failed", id)
+                               return false
+                       }
+                       super.logger.Infof("%s is ready", id)
+               }
+               super.logger.Info("all clusters are ready")
+               return true
+       }
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()
        for waiting := "all"; waiting != ""; {
                select {
                case <-ticker.C:
                case <-super.ctx.Done():
-                       return nil, false
+                       return false
                }
                if super.healthChecker == nil {
                        // not set up yet
@@ -358,8 +502,7 @@ func (super *Supervisor) WaitReady() (*arvados.URL, bool) {
                        super.logger.WithField("targets", waiting[1:]).Info("waiting")
                }
        }
-       u := super.cluster.Services.Controller.ExternalURL
-       return &u, true
+       return true
 }
 
 func (super *Supervisor) prependEnv(key, prepend string) {
@@ -631,11 +774,7 @@ func (super *Supervisor) RunProgram(ctx context.Context, dir string, opts runOpt
        return nil
 }
 
-func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
-       cluster, err := cfg.GetCluster("")
-       if err != nil {
-               return err
-       }
+func (super *Supervisor) autofillConfig() error {
        usedPort := map[string]bool{}
        nextPort := func(host string) (string, error) {
                for {
@@ -653,39 +792,39 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
                        return port, nil
                }
        }
-       if cluster.Services.Controller.ExternalURL.Host == "" {
+       if super.cluster.Services.Controller.ExternalURL.Host == "" {
                h, p, err := net.SplitHostPort(super.ControllerAddr)
-               if err != nil {
-                       return fmt.Errorf("SplitHostPort(ControllerAddr): %w", err)
+               if err != nil && super.ControllerAddr != "" {
+                       return fmt.Errorf("SplitHostPort(ControllerAddr %q): %w", super.ControllerAddr, err)
                }
                if h == "" {
                        h = super.ListenHost
                }
-               if p == "0" {
+               if p == "0" || p == "" {
                        p, err = nextPort(h)
                        if err != nil {
                                return err
                        }
                }
-               cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p), Path: "/"}
+               super.cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p), Path: "/"}
        }
-       u := url.URL(cluster.Services.Controller.ExternalURL)
+       u := url.URL(super.cluster.Services.Controller.ExternalURL)
        defaultExtHost := u.Hostname()
        for _, svc := range []*arvados.Service{
-               &cluster.Services.Controller,
-               &cluster.Services.DispatchCloud,
-               &cluster.Services.GitHTTP,
-               &cluster.Services.Health,
-               &cluster.Services.Keepproxy,
-               &cluster.Services.Keepstore,
-               &cluster.Services.RailsAPI,
-               &cluster.Services.WebDAV,
-               &cluster.Services.WebDAVDownload,
-               &cluster.Services.Websocket,
-               &cluster.Services.Workbench1,
-               &cluster.Services.Workbench2,
+               &super.cluster.Services.Controller,
+               &super.cluster.Services.DispatchCloud,
+               &super.cluster.Services.GitHTTP,
+               &super.cluster.Services.Health,
+               &super.cluster.Services.Keepproxy,
+               &super.cluster.Services.Keepstore,
+               &super.cluster.Services.RailsAPI,
+               &super.cluster.Services.WebDAV,
+               &super.cluster.Services.WebDAVDownload,
+               &super.cluster.Services.Websocket,
+               &super.cluster.Services.Workbench1,
+               &super.cluster.Services.Workbench2,
        } {
-               if svc == &cluster.Services.DispatchCloud && super.ClusterType == "test" {
+               if svc == &super.cluster.Services.DispatchCloud && super.ClusterType == "test" {
                        continue
                }
                if svc.ExternalURL.Host == "" {
@@ -694,21 +833,21 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
                                return err
                        }
                        host := net.JoinHostPort(defaultExtHost, port)
-                       if svc == &cluster.Services.Controller ||
-                               svc == &cluster.Services.GitHTTP ||
-                               svc == &cluster.Services.Health ||
-                               svc == &cluster.Services.Keepproxy ||
-                               svc == &cluster.Services.WebDAV ||
-                               svc == &cluster.Services.WebDAVDownload ||
-                               svc == &cluster.Services.Workbench1 ||
-                               svc == &cluster.Services.Workbench2 {
+                       if svc == &super.cluster.Services.Controller ||
+                               svc == &super.cluster.Services.GitHTTP ||
+                               svc == &super.cluster.Services.Health ||
+                               svc == &super.cluster.Services.Keepproxy ||
+                               svc == &super.cluster.Services.WebDAV ||
+                               svc == &super.cluster.Services.WebDAVDownload ||
+                               svc == &super.cluster.Services.Workbench1 ||
+                               svc == &super.cluster.Services.Workbench2 {
                                svc.ExternalURL = arvados.URL{Scheme: "https", Host: host, Path: "/"}
-                       } else if svc == &cluster.Services.Websocket {
+                       } else if svc == &super.cluster.Services.Websocket {
                                svc.ExternalURL = arvados.URL{Scheme: "wss", Host: host, Path: "/websocket"}
                        }
                }
-               if super.NoWorkbench1 && svc == &cluster.Services.Workbench1 ||
-                       super.NoWorkbench2 && svc == &cluster.Services.Workbench2 {
+               if super.NoWorkbench1 && svc == &super.cluster.Services.Workbench1 ||
+                       super.NoWorkbench2 && svc == &super.cluster.Services.Workbench2 {
                        // When workbench1 is disabled, it gets an
                        // ExternalURL (so we have a valid listening
                        // port to write in our Nginx config) but no
@@ -728,26 +867,26 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
                }
        }
        if super.ClusterType != "production" {
-               if cluster.SystemRootToken == "" {
-                       cluster.SystemRootToken = randomHexString(64)
+               if super.cluster.SystemRootToken == "" {
+                       super.cluster.SystemRootToken = randomHexString(64)
                }
-               if cluster.ManagementToken == "" {
-                       cluster.ManagementToken = randomHexString(64)
+               if super.cluster.ManagementToken == "" {
+                       super.cluster.ManagementToken = randomHexString(64)
                }
-               if cluster.Collections.BlobSigningKey == "" {
-                       cluster.Collections.BlobSigningKey = randomHexString(64)
+               if super.cluster.Collections.BlobSigningKey == "" {
+                       super.cluster.Collections.BlobSigningKey = randomHexString(64)
                }
-               if cluster.Users.AnonymousUserToken == "" {
-                       cluster.Users.AnonymousUserToken = randomHexString(64)
+               if super.cluster.Users.AnonymousUserToken == "" {
+                       super.cluster.Users.AnonymousUserToken = randomHexString(64)
                }
-               if cluster.Containers.DispatchPrivateKey == "" {
+               if super.cluster.Containers.DispatchPrivateKey == "" {
                        buf, err := ioutil.ReadFile(filepath.Join(super.SourcePath, "lib", "dispatchcloud", "test", "sshkey_dispatch"))
                        if err != nil {
                                return err
                        }
-                       cluster.Containers.DispatchPrivateKey = string(buf)
+                       super.cluster.Containers.DispatchPrivateKey = string(buf)
                }
-               cluster.TLS.Insecure = true
+               super.cluster.TLS.Insecure = true
        }
        if super.ClusterType == "test" {
                // Add a second keepstore process.
@@ -756,13 +895,13 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
                        return err
                }
                host := net.JoinHostPort(super.ListenHost, port)
-               cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: host, Path: "/"}] = arvados.ServiceInstance{}
+               super.cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: host, Path: "/"}] = arvados.ServiceInstance{}
 
                // Create a directory-backed volume for each keepstore
                // process.
-               cluster.Volumes = map[string]arvados.Volume{}
-               for url := range cluster.Services.Keepstore.InternalURLs {
-                       volnum := len(cluster.Volumes)
+               super.cluster.Volumes = map[string]arvados.Volume{}
+               for url := range super.cluster.Services.Keepstore.InternalURLs {
+                       volnum := len(super.cluster.Volumes)
                        datadir := fmt.Sprintf("%s/keep%d.data", super.tempdir, volnum)
                        if _, err = os.Stat(datadir + "/."); err == nil {
                        } else if !os.IsNotExist(err) {
@@ -770,7 +909,7 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
                        } else if err = os.Mkdir(datadir, 0755); err != nil {
                                return err
                        }
-                       cluster.Volumes[fmt.Sprintf(cluster.ClusterID+"-nyw5e-%015d", volnum)] = arvados.Volume{
+                       super.cluster.Volumes[fmt.Sprintf(super.cluster.ClusterID+"-nyw5e-%015d", volnum)] = arvados.Volume{
                                Driver:           "Directory",
                                DriverParameters: json.RawMessage(fmt.Sprintf(`{"Root":%q}`, datadir)),
                                AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
@@ -783,7 +922,7 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
                                },
                        }
                }
-               cluster.StorageClasses = map[string]arvados.StorageClassConfig{
+               super.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
                        "default": {Default: true},
                        "foo":     {},
                        "bar":     {},
@@ -794,7 +933,7 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
                if err != nil {
                        return err
                }
-               cluster.PostgreSQL.Connection = arvados.PostgreSQLConnection{
+               super.cluster.PostgreSQL.Connection = arvados.PostgreSQLConnection{
                        "client_encoding": "utf8",
                        "host":            "localhost",
                        "port":            port,
@@ -803,8 +942,6 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
                        "password":        "insecure_arvados_test",
                }
        }
-
-       cfg.Clusters[cluster.ClusterID] = *cluster
        return nil
 }
 
@@ -876,6 +1013,7 @@ func availablePort(host string) (string, error) {
 // Try to connect to addr until it works, then close ch. Give up if
 // ctx cancels.
 func waitForConnect(ctx context.Context, addr string) error {
+       ctxlog.FromContext(ctx).WithField("addr", addr).Info("waitForConnect")
        dialer := net.Dialer{Timeout: time.Second}
        for ctx.Err() == nil {
                conn, err := dialer.DialContext(ctx, "tcp", addr)
index 50cf89c0d485156ac2e09c6e90934014b57bb7e3..44be17c77ffb1d99e4fbeafcbe2a01dd30a82b60 100644 (file)
@@ -17,13 +17,12 @@ import (
        "net/http"
        "os"
        "os/exec"
-       "path/filepath"
        "strconv"
        "strings"
        "sync"
+       "time"
 
        "git.arvados.org/arvados.git/lib/boot"
-       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
@@ -34,13 +33,11 @@ import (
 var _ = check.Suite(&IntegrationSuite{})
 
 type IntegrationSuite struct {
-       testClusters map[string]*boot.TestCluster
+       super        *boot.Supervisor
        oidcprovider *arvadostest.OIDCProvider
 }
 
 func (s *IntegrationSuite) SetUpSuite(c *check.C) {
-       cwd, _ := os.Getwd()
-
        s.oidcprovider = arvadostest.NewOIDCProvider(c)
        s.oidcprovider.AuthEmail = "user@example.com"
        s.oidcprovider.AuthEmailVerified = true
@@ -48,13 +45,8 @@ func (s *IntegrationSuite) SetUpSuite(c *check.C) {
        s.oidcprovider.ValidClientID = "clientid"
        s.oidcprovider.ValidClientSecret = "clientsecret"
 
-       s.testClusters = map[string]*boot.TestCluster{
-               "z1111": nil,
-               "z2222": nil,
-               "z3333": nil,
-       }
        hostport := map[string]string{}
-       for id := range s.testClusters {
+       for _, id := range []string{"z1111", "z2222", "z3333"} {
                hostport[id] = func() string {
                        // TODO: Instead of expecting random ports on
                        // 127.0.0.11, 22, 33 to be race-safe, try
@@ -68,8 +60,9 @@ func (s *IntegrationSuite) SetUpSuite(c *check.C) {
                        return "127.0.0." + id[3:] + ":" + port
                }()
        }
-       for id := range s.testClusters {
-               yaml := `Clusters:
+       yaml := "Clusters:\n"
+       for id := range hostport {
+               yaml += `
   ` + id + `:
     Services:
       Controller:
@@ -124,37 +117,35 @@ func (s *IntegrationSuite) SetUpSuite(c *check.C) {
       LoginCluster: z1111
 `
                }
-
-               loader := config.NewLoader(bytes.NewBufferString(yaml), ctxlog.TestLogger(c))
-               loader.Path = "-"
-               loader.SkipLegacy = true
-               loader.SkipAPICalls = true
-               cfg, err := loader.Load()
-               c.Assert(err, check.IsNil)
-               tc := boot.NewTestCluster(
-                       filepath.Join(cwd, "..", ".."),
-                       id, cfg, "127.0.0."+id[3:], c.Log)
-               tc.Super.NoWorkbench1 = true
-               tc.Super.NoWorkbench2 = true
-               tc.Start()
-               s.testClusters[id] = tc
        }
-       for _, tc := range s.testClusters {
-               ok := tc.WaitReady()
-               c.Assert(ok, check.Equals, true)
+       s.super = &boot.Supervisor{
+               ClusterType:          "test",
+               ConfigYAML:           yaml,
+               Stderr:               ctxlog.LogWriter(c.Log),
+               NoWorkbench1:         true,
+               NoWorkbench2:         true,
+               OwnTemporaryDatabase: true,
        }
+
+       // Give up if startup takes longer than 3m
+       timeout := time.AfterFunc(3*time.Minute, s.super.Stop)
+       defer timeout.Stop()
+       s.super.Start(context.Background())
+       ok := s.super.WaitReady()
+       c.Assert(ok, check.Equals, true)
 }
 
 func (s *IntegrationSuite) TearDownSuite(c *check.C) {
-       for _, c := range s.testClusters {
-               c.Super.Stop()
+       if s.super != nil {
+               s.super.Stop()
+               s.super.Wait()
        }
 }
 
 func (s *IntegrationSuite) TestDefaultStorageClassesOnCollections(c *check.C) {
-       conn := s.testClusters["z1111"].Conn()
-       rootctx, _, _ := s.testClusters["z1111"].RootClients()
-       userctx, _, kc, _ := s.testClusters["z1111"].UserClients(rootctx, c, conn, s.oidcprovider.AuthEmail, true)
+       conn := s.super.Conn("z1111")
+       rootctx, _, _ := s.super.RootClients("z1111")
+       userctx, _, kc, _ := s.super.UserClients("z1111", rootctx, c, conn, s.oidcprovider.AuthEmail, true)
        c.Assert(len(kc.DefaultStorageClasses) > 0, check.Equals, true)
        coll, err := conn.CollectionCreate(userctx, arvados.CreateOptions{})
        c.Assert(err, check.IsNil)
@@ -162,10 +153,10 @@ func (s *IntegrationSuite) TestDefaultStorageClassesOnCollections(c *check.C) {
 }
 
 func (s *IntegrationSuite) TestGetCollectionByPDH(c *check.C) {
-       conn1 := s.testClusters["z1111"].Conn()
-       rootctx1, _, _ := s.testClusters["z1111"].RootClients()
-       conn3 := s.testClusters["z3333"].Conn()
-       userctx1, ac1, kc1, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+       conn1 := s.super.Conn("z1111")
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       conn3 := s.super.Conn("z3333")
+       userctx1, ac1, kc1, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
 
        // Create the collection to find its PDH (but don't save it
        // anywhere yet)
@@ -201,11 +192,11 @@ func (s *IntegrationSuite) TestGetCollectionByPDH(c *check.C) {
 
 // Tests bug #18004
 func (s *IntegrationSuite) TestRemoteUserAndTokenCacheRace(c *check.C) {
-       conn1 := s.testClusters["z1111"].Conn()
-       rootctx1, _, _ := s.testClusters["z1111"].RootClients()
-       rootctx2, _, _ := s.testClusters["z2222"].RootClients()
-       conn2 := s.testClusters["z2222"].Conn()
-       userctx1, _, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user2@example.com", true)
+       conn1 := s.super.Conn("z1111")
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       rootctx2, _, _ := s.super.RootClients("z2222")
+       conn2 := s.super.Conn("z2222")
+       userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, "user2@example.com", true)
 
        var wg1, wg2 sync.WaitGroup
        creqs := 100
@@ -250,13 +241,13 @@ func (s *IntegrationSuite) TestS3WithFederatedToken(c *check.C) {
 
        testText := "IntegrationSuite.TestS3WithFederatedToken"
 
-       conn1 := s.testClusters["z1111"].Conn()
-       rootctx1, _, _ := s.testClusters["z1111"].RootClients()
-       userctx1, ac1, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
-       conn3 := s.testClusters["z3333"].Conn()
+       conn1 := s.super.Conn("z1111")
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       userctx1, ac1, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+       conn3 := s.super.Conn("z3333")
 
        createColl := func(clusterID string) arvados.Collection {
-               _, ac, kc := s.testClusters[clusterID].ClientsWithToken(ac1.AuthToken)
+               _, ac, kc := s.super.ClientsWithToken(clusterID, ac1.AuthToken)
                var coll arvados.Collection
                fs, err := coll.FileSystem(ac, kc)
                c.Assert(err, check.IsNil)
@@ -268,7 +259,7 @@ func (s *IntegrationSuite) TestS3WithFederatedToken(c *check.C) {
                c.Assert(err, check.IsNil)
                mtxt, err := fs.MarshalManifest(".")
                c.Assert(err, check.IsNil)
-               coll, err = s.testClusters[clusterID].Conn().CollectionCreate(userctx1, arvados.CreateOptions{Attrs: map[string]interface{}{
+               coll, err = s.super.Conn(clusterID).CollectionCreate(userctx1, arvados.CreateOptions{Attrs: map[string]interface{}{
                        "manifest_text": mtxt,
                }})
                c.Assert(err, check.IsNil)
@@ -316,10 +307,10 @@ func (s *IntegrationSuite) TestS3WithFederatedToken(c *check.C) {
 }
 
 func (s *IntegrationSuite) TestGetCollectionAsAnonymous(c *check.C) {
-       conn1 := s.testClusters["z1111"].Conn()
-       conn3 := s.testClusters["z3333"].Conn()
-       rootctx1, rootac1, rootkc1 := s.testClusters["z1111"].RootClients()
-       anonctx3, anonac3, _ := s.testClusters["z3333"].AnonymousClients()
+       conn1 := s.super.Conn("z1111")
+       conn3 := s.super.Conn("z3333")
+       rootctx1, rootac1, rootkc1 := s.super.RootClients("z1111")
+       anonctx3, anonac3, _ := s.super.AnonymousClients("z3333")
 
        // Make sure anonymous token was set
        c.Assert(anonac3.AuthToken, check.Not(check.Equals), "")
@@ -368,7 +359,7 @@ func (s *IntegrationSuite) TestGetCollectionAsAnonymous(c *check.C) {
        c.Check(err, check.IsNil)
 
        // Make a v2 token of the z3 anonymous user, and use it on z1
-       _, anonac1, _ := s.testClusters["z1111"].ClientsWithToken(outAuth.TokenV2())
+       _, anonac1, _ := s.super.ClientsWithToken("z1111", outAuth.TokenV2())
        outUser2, err := anonac1.CurrentUser()
        c.Check(err, check.IsNil)
        // z3 anonymous user will be mapped to the z1 anonymous user
@@ -394,14 +385,13 @@ func (s *IntegrationSuite) TestGetCollectionAsAnonymous(c *check.C) {
 // the z3333 anonymous user token should not prohibit the request from being
 // forwarded.
 func (s *IntegrationSuite) TestForwardAnonymousTokenToLoginCluster(c *check.C) {
-       conn1 := s.testClusters["z1111"].Conn()
-       s.testClusters["z3333"].Conn()
+       conn1 := s.super.Conn("z1111")
 
-       rootctx1, _, _ := s.testClusters["z1111"].RootClients()
-       _, anonac3, _ := s.testClusters["z3333"].AnonymousClients()
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       _, anonac3, _ := s.super.AnonymousClients("z3333")
 
        // Make a user connection to z3333 (using a z1111 user, because that's the login cluster)
-       _, userac1, _, _ := s.testClusters["z3333"].UserClients(rootctx1, c, conn1, "user@example.com", true)
+       _, userac1, _, _ := s.super.UserClients("z3333", rootctx1, c, conn1, "user@example.com", true)
 
        // Get the anonymous user token for z3333
        var anon3Auth arvados.APIClientAuthorization
@@ -433,14 +423,14 @@ func (s *IntegrationSuite) TestForwardAnonymousTokenToLoginCluster(c *check.C) {
 // Get a token from the login cluster (z1111), use it to submit a
 // container request on z2222.
 func (s *IntegrationSuite) TestCreateContainerRequestWithFedToken(c *check.C) {
-       conn1 := s.testClusters["z1111"].Conn()
-       rootctx1, _, _ := s.testClusters["z1111"].RootClients()
-       _, ac1, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+       conn1 := s.super.Conn("z1111")
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       _, ac1, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
 
        // Use ac2 to get the discovery doc with a blank token, so the
        // SDK doesn't magically pass the z1111 token to z2222 before
        // we're ready to start our test.
-       _, ac2, _ := s.testClusters["z2222"].ClientsWithToken("")
+       _, ac2, _ := s.super.ClientsWithToken("z2222", "")
        var dd map[string]interface{}
        err := ac2.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
        c.Assert(err, check.IsNil)
@@ -502,9 +492,9 @@ func (s *IntegrationSuite) TestCreateContainerRequestWithFedToken(c *check.C) {
 }
 
 func (s *IntegrationSuite) TestCreateContainerRequestWithBadToken(c *check.C) {
-       conn1 := s.testClusters["z1111"].Conn()
-       rootctx1, _, _ := s.testClusters["z1111"].RootClients()
-       _, ac1, _, au := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user@example.com", true)
+       conn1 := s.super.Conn("z1111")
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       _, ac1, _, au := s.super.UserClients("z1111", rootctx1, c, conn1, "user@example.com", true)
 
        tests := []struct {
                name         string
@@ -539,9 +529,9 @@ func (s *IntegrationSuite) TestCreateContainerRequestWithBadToken(c *check.C) {
 }
 
 func (s *IntegrationSuite) TestRequestIDHeader(c *check.C) {
-       conn1 := s.testClusters["z1111"].Conn()
-       rootctx1, _, _ := s.testClusters["z1111"].RootClients()
-       userctx1, ac1, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user@example.com", true)
+       conn1 := s.super.Conn("z1111")
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       userctx1, ac1, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, "user@example.com", true)
 
        coll, err := conn1.CollectionCreate(userctx1, arvados.CreateOptions{})
        c.Check(err, check.IsNil)
@@ -613,7 +603,7 @@ func (s *IntegrationSuite) TestRequestIDHeader(c *check.C) {
 // to test tokens that are secret, so there is no API response that will give them back
 func (s *IntegrationSuite) dbConn(c *check.C, clusterID string) (*sql.DB, *sql.Conn) {
        ctx := context.Background()
-       db, err := sql.Open("postgres", s.testClusters[clusterID].Super.Cluster().PostgreSQL.Connection.String())
+       db, err := sql.Open("postgres", s.super.Cluster(clusterID).PostgreSQL.Connection.String())
        c.Assert(err, check.IsNil)
 
        conn, err := db.Conn(ctx)
@@ -633,9 +623,9 @@ func (s *IntegrationSuite) TestRuntimeTokenInCR(c *check.C) {
        db, dbconn := s.dbConn(c, "z1111")
        defer db.Close()
        defer dbconn.Close()
-       conn1 := s.testClusters["z1111"].Conn()
-       rootctx1, _, _ := s.testClusters["z1111"].RootClients()
-       userctx1, ac1, _, au := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user@example.com", true)
+       conn1 := s.super.Conn("z1111")
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       userctx1, ac1, _, au := s.super.UserClients("z1111", rootctx1, c, conn1, "user@example.com", true)
 
        tests := []struct {
                name                 string
@@ -685,9 +675,9 @@ func (s *IntegrationSuite) TestRuntimeTokenInCR(c *check.C) {
 // one cluster with another cluster as the destination
 // and check the tokens are being handled properly
 func (s *IntegrationSuite) TestIntermediateCluster(c *check.C) {
-       conn1 := s.testClusters["z1111"].Conn()
-       rootctx1, _, _ := s.testClusters["z1111"].RootClients()
-       uctx1, ac1, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user@example.com", true)
+       conn1 := s.super.Conn("z1111")
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       uctx1, ac1, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, "user@example.com", true)
 
        tests := []struct {
                name                 string
@@ -717,20 +707,20 @@ func (s *IntegrationSuite) TestIntermediateCluster(c *check.C) {
 
 // Test for #17785
 func (s *IntegrationSuite) TestFederatedApiClientAuthHandling(c *check.C) {
-       rootctx1, rootclnt1, _ := s.testClusters["z1111"].RootClients()
-       conn1 := s.testClusters["z1111"].Conn()
+       rootctx1, rootclnt1, _ := s.super.RootClients("z1111")
+       conn1 := s.super.Conn("z1111")
 
        // Make sure LoginCluster is properly configured
        for _, cls := range []string{"z1111", "z3333"} {
                c.Check(
-                       s.testClusters[cls].Config.Clusters[cls].Login.LoginCluster,
+                       s.super.Cluster(cls).Login.LoginCluster,
                        check.Equals, "z1111",
                        check.Commentf("incorrect LoginCluster config on cluster %q", cls))
        }
        // Get user's UUID & attempt to create a token for it on the remote cluster
-       _, _, _, user := s.testClusters["z1111"].UserClients(rootctx1, c, conn1,
+       _, _, _, user := s.super.UserClients("z1111", rootctx1, c, conn1,
                "user@example.com", true)
-       _, rootclnt3, _ := s.testClusters["z3333"].ClientsWithToken(rootclnt1.AuthToken)
+       _, rootclnt3, _ := s.super.ClientsWithToken("z3333", rootclnt1.AuthToken)
        var resp arvados.APIClientAuthorization
        err := rootclnt3.RequestAndDecode(
                &resp, "POST", "arvados/v1/api_client_authorizations", nil,
@@ -749,7 +739,7 @@ func (s *IntegrationSuite) TestFederatedApiClientAuthHandling(c *check.C) {
        c.Assert(strings.HasPrefix(newTok, "v2/z1111-gj3su-"), check.Equals, true)
 
        // Confirm the token works and is from the correct user
-       _, rootclnt3bis, _ := s.testClusters["z3333"].ClientsWithToken(newTok)
+       _, rootclnt3bis, _ := s.super.ClientsWithToken("z3333", newTok)
        var curUser arvados.User
        err = rootclnt3bis.RequestAndDecode(
                &curUser, "GET", "arvados/v1/users/current", nil, nil,
@@ -758,7 +748,7 @@ func (s *IntegrationSuite) TestFederatedApiClientAuthHandling(c *check.C) {
        c.Assert(curUser.UUID, check.Equals, user.UUID)
 
        // Request the ApiClientAuthorization list using the new token
-       _, userClient, _ := s.testClusters["z3333"].ClientsWithToken(newTok)
+       _, userClient, _ := s.super.ClientsWithToken("z3333", newTok)
        var acaLst arvados.APIClientAuthorizationList
        err = userClient.RequestAndDecode(
                &acaLst, "GET", "arvados/v1/api_client_authorizations", nil, nil,
@@ -768,15 +758,15 @@ func (s *IntegrationSuite) TestFederatedApiClientAuthHandling(c *check.C) {
 
 // Test for bug #18076
 func (s *IntegrationSuite) TestStaleCachedUserRecord(c *check.C) {
-       rootctx1, _, _ := s.testClusters["z1111"].RootClients()
-       _, rootclnt3, _ := s.testClusters["z3333"].RootClients()
-       conn1 := s.testClusters["z1111"].Conn()
-       conn3 := s.testClusters["z3333"].Conn()
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       _, rootclnt3, _ := s.super.RootClients("z3333")
+       conn1 := s.super.Conn("z1111")
+       conn3 := s.super.Conn("z3333")
 
        // Make sure LoginCluster is properly configured
        for _, cls := range []string{"z1111", "z3333"} {
                c.Check(
-                       s.testClusters[cls].Config.Clusters[cls].Login.LoginCluster,
+                       s.super.Cluster(cls).Login.LoginCluster,
                        check.Equals, "z1111",
                        check.Commentf("incorrect LoginCluster config on cluster %q", cls))
        }
@@ -792,7 +782,7 @@ func (s *IntegrationSuite) TestStaleCachedUserRecord(c *check.C) {
                // Create some users, request them on the federated cluster so they're cached.
                var users []arvados.User
                for userNr := 0; userNr < 2; userNr++ {
-                       _, _, _, user := s.testClusters["z1111"].UserClients(
+                       _, _, _, user := s.super.UserClients("z1111",
                                rootctx1,
                                c,
                                conn1,
@@ -871,15 +861,15 @@ func (s *IntegrationSuite) TestStaleCachedUserRecord(c *check.C) {
 
 // Test for bug #16263
 func (s *IntegrationSuite) TestListUsers(c *check.C) {
-       rootctx1, _, _ := s.testClusters["z1111"].RootClients()
-       conn1 := s.testClusters["z1111"].Conn()
-       conn3 := s.testClusters["z3333"].Conn()
-       userctx1, _, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       conn1 := s.super.Conn("z1111")
+       conn3 := s.super.Conn("z3333")
+       userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
 
        // Make sure LoginCluster is properly configured
-       for cls := range s.testClusters {
+       for _, cls := range []string{"z1111", "z2222", "z3333"} {
                c.Check(
-                       s.testClusters[cls].Config.Clusters[cls].Login.LoginCluster,
+                       s.super.Cluster(cls).Login.LoginCluster,
                        check.Equals, "z1111",
                        check.Commentf("incorrect LoginCluster config on cluster %q", cls))
        }
@@ -939,12 +929,12 @@ func (s *IntegrationSuite) TestListUsers(c *check.C) {
 }
 
 func (s *IntegrationSuite) TestSetupUserWithVM(c *check.C) {
-       conn1 := s.testClusters["z1111"].Conn()
-       conn3 := s.testClusters["z3333"].Conn()
-       rootctx1, rootac1, _ := s.testClusters["z1111"].RootClients()
+       conn1 := s.super.Conn("z1111")
+       conn3 := s.super.Conn("z3333")
+       rootctx1, rootac1, _ := s.super.RootClients("z1111")
 
        // Create user on LoginCluster z1111
-       _, _, _, user := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+       _, _, _, user := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
 
        // Make a new root token (because rootClients() uses SystemRootToken)
        var outAuth arvados.APIClientAuthorization
@@ -952,7 +942,7 @@ func (s *IntegrationSuite) TestSetupUserWithVM(c *check.C) {
        c.Check(err, check.IsNil)
 
        // Make a v2 root token to communicate with z3333
-       rootctx3, rootac3, _ := s.testClusters["z3333"].ClientsWithToken(outAuth.TokenV2())
+       rootctx3, rootac3, _ := s.super.ClientsWithToken("z3333", outAuth.TokenV2())
 
        // Create VM on z3333
        var outVM arvados.VirtualMachine
@@ -1003,9 +993,9 @@ func (s *IntegrationSuite) TestSetupUserWithVM(c *check.C) {
 }
 
 func (s *IntegrationSuite) TestOIDCAccessTokenAuth(c *check.C) {
-       conn1 := s.testClusters["z1111"].Conn()
-       rootctx1, _, _ := s.testClusters["z1111"].RootClients()
-       s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+       conn1 := s.super.Conn("z1111")
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
 
        accesstoken := s.oidcprovider.ValidAccessToken()
 
@@ -1017,8 +1007,8 @@ func (s *IntegrationSuite) TestOIDCAccessTokenAuth(c *check.C) {
                {
                        c.Logf("save collection to %s", clusterID)
 
-                       conn := s.testClusters[clusterID].Conn()
-                       ctx, ac, kc := s.testClusters[clusterID].ClientsWithToken(accesstoken)
+                       conn := s.super.Conn(clusterID)
+                       ctx, ac, kc := s.super.ClientsWithToken(clusterID, accesstoken)
 
                        fs, err := coll.FileSystem(ac, kc)
                        c.Assert(err, check.IsNil)
@@ -1042,8 +1032,8 @@ func (s *IntegrationSuite) TestOIDCAccessTokenAuth(c *check.C) {
                for _, readClusterID := range []string{"z1111", "z2222", "z3333"} {
                        c.Logf("retrieve %s from %s", coll.UUID, readClusterID)
 
-                       conn := s.testClusters[readClusterID].Conn()
-                       ctx, ac, kc := s.testClusters[readClusterID].ClientsWithToken(accesstoken)
+                       conn := s.super.Conn(readClusterID)
+                       ctx, ac, kc := s.super.ClientsWithToken(readClusterID, accesstoken)
 
                        user, err := conn.UserGetCurrent(ctx, arvados.GetOptions{})
                        c.Assert(err, check.IsNil)
@@ -1071,11 +1061,11 @@ func (s *IntegrationSuite) TestForwardRuntimeTokenToLoginCluster(c *check.C) {
        db3, db3conn := s.dbConn(c, "z3333")
        defer db3.Close()
        defer db3conn.Close()
-       rootctx1, _, _ := s.testClusters["z1111"].RootClients()
-       rootctx3, _, _ := s.testClusters["z3333"].RootClients()
-       conn1 := s.testClusters["z1111"].Conn()
-       conn3 := s.testClusters["z3333"].Conn()
-       userctx1, _, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user@example.com", true)
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       rootctx3, _, _ := s.super.RootClients("z3333")
+       conn1 := s.super.Conn("z1111")
+       conn3 := s.super.Conn("z3333")
+       userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, "user@example.com", true)
 
        user1, err := conn1.UserGetCurrent(userctx1, arvados.GetOptions{})
        c.Assert(err, check.IsNil)
@@ -1113,7 +1103,7 @@ func (s *IntegrationSuite) TestForwardRuntimeTokenToLoginCluster(c *check.C) {
        row.Scan(&val)
        c.Assert(val.Valid, check.Equals, true)
        runtimeToken := "v2/" + ctr.AuthUUID + "/" + val.String
-       ctrctx, _, _ := s.testClusters["z3333"].ClientsWithToken(runtimeToken)
+       ctrctx, _, _ := s.super.ClientsWithToken("z3333", runtimeToken)
        c.Logf("container runtime token %+v", runtimeToken)
 
        _, err = conn3.UserGet(ctrctx, arvados.GetOptions{UUID: user1.UUID})
index d5fed3e29f819c83709a141e5e9c951635cb8586..71c119156abfb7eb24419884042d7b8b0bb29b5e 100644 (file)
@@ -5,13 +5,12 @@
 package main
 
 import (
-       "bytes"
+       "context"
        "net"
        "os"
-       "path/filepath"
+       "time"
 
        "git.arvados.org/arvados.git/lib/boot"
-       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
@@ -23,7 +22,7 @@ var _ = check.Suite(&FederationSuite{})
 var origAPIHost, origAPIToken string
 
 type FederationSuite struct {
-       testClusters map[string]*boot.TestCluster
+       super        *boot.Supervisor
        oidcprovider *arvadostest.OIDCProvider
 }
 
@@ -31,8 +30,6 @@ func (s *FederationSuite) SetUpSuite(c *check.C) {
        origAPIHost = os.Getenv("ARVADOS_API_HOST")
        origAPIToken = os.Getenv("ARVADOS_API_TOKEN")
 
-       cwd, _ := os.Getwd()
-
        s.oidcprovider = arvadostest.NewOIDCProvider(c)
        s.oidcprovider.AuthEmail = "user@example.com"
        s.oidcprovider.AuthEmailVerified = true
@@ -40,12 +37,8 @@ func (s *FederationSuite) SetUpSuite(c *check.C) {
        s.oidcprovider.ValidClientID = "clientid"
        s.oidcprovider.ValidClientSecret = "clientsecret"
 
-       s.testClusters = map[string]*boot.TestCluster{
-               "z1111": nil,
-               "z2222": nil,
-       }
        hostport := map[string]string{}
-       for id := range s.testClusters {
+       for _, id := range []string{"z1111", "z2222"} {
                hostport[id] = func() string {
                        // TODO: Instead of expecting random ports on
                        // 127.0.0.11, 22 to be race-safe, try
@@ -59,8 +52,9 @@ func (s *FederationSuite) SetUpSuite(c *check.C) {
                        return "127.0.0." + id[3:] + ":" + port
                }()
        }
-       for id := range s.testClusters {
-               yaml := `Clusters:
+       yaml := "Clusters:\n"
+       for id := range hostport {
+               yaml += `
   ` + id + `:
     Services:
       Controller:
@@ -104,30 +98,27 @@ func (s *FederationSuite) SetUpSuite(c *check.C) {
       LoginCluster: z1111
 `
                }
-
-               loader := config.NewLoader(bytes.NewBufferString(yaml), ctxlog.TestLogger(c))
-               loader.Path = "-"
-               loader.SkipLegacy = true
-               loader.SkipAPICalls = true
-               cfg, err := loader.Load()
-               c.Assert(err, check.IsNil)
-               tc := boot.NewTestCluster(
-                       filepath.Join(cwd, "..", ".."),
-                       id, cfg, "127.0.0."+id[3:], c.Log)
-               tc.Super.NoWorkbench1 = true
-               tc.Super.NoWorkbench2 = true
-               tc.Start()
-               s.testClusters[id] = tc
        }
-       for _, tc := range s.testClusters {
-               ok := tc.WaitReady()
-               c.Assert(ok, check.Equals, true)
+       s.super = &boot.Supervisor{
+               ClusterType:          "test",
+               ConfigYAML:           yaml,
+               Stderr:               ctxlog.LogWriter(c.Log),
+               NoWorkbench1:         true,
+               NoWorkbench2:         true,
+               OwnTemporaryDatabase: true,
        }
 
+       // Give up if startup takes longer than 3m
+       timeout := time.AfterFunc(3*time.Minute, s.super.Stop)
+       defer timeout.Stop()
+       s.super.Start(context.Background())
+       ok := s.super.WaitReady()
+       c.Assert(ok, check.Equals, true)
+
        // Activate user, make it admin.
-       conn1 := s.testClusters["z1111"].Conn()
-       rootctx1, _, _ := s.testClusters["z1111"].RootClients()
-       userctx1, _, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+       conn1 := s.super.Conn("z1111")
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
        user1, err := conn1.UserGetCurrent(userctx1, arvados.GetOptions{})
        c.Assert(err, check.IsNil)
        c.Assert(user1.IsAdmin, check.Equals, false)
@@ -142,25 +133,23 @@ func (s *FederationSuite) SetUpSuite(c *check.C) {
 }
 
 func (s *FederationSuite) TearDownSuite(c *check.C) {
-       for _, c := range s.testClusters {
-               c.Super.Stop()
-       }
+       s.super.Stop()
        _ = os.Setenv("ARVADOS_API_HOST", origAPIHost)
        _ = os.Setenv("ARVADOS_API_TOKEN", origAPIToken)
 }
 
 func (s *FederationSuite) TestGroupSyncingOnFederatedCluster(c *check.C) {
        // Get admin user's V2 token
-       conn1 := s.testClusters["z1111"].Conn()
-       rootctx1, _, _ := s.testClusters["z1111"].RootClients()
-       userctx1, _, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+       conn1 := s.super.Conn("z1111")
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
        user1Auth, err := conn1.APIClientAuthorizationCurrent(userctx1, arvados.GetOptions{})
        c.Check(err, check.IsNil)
        userV2Token := user1Auth.TokenV2()
 
        // Get federated admin clients on z2222 to set up environment
-       conn2 := s.testClusters["z2222"].Conn()
-       userctx2, userac2, _ := s.testClusters["z2222"].ClientsWithToken(userV2Token)
+       conn2 := s.super.Conn("z2222")
+       userctx2, userac2, _ := s.super.ClientsWithToken("z2222", userV2Token)
        user2, err := conn2.UserGetCurrent(userctx2, arvados.GetOptions{})
        c.Check(err, check.IsNil)
        c.Check(user2.IsAdmin, check.Equals, true)
@@ -177,7 +166,7 @@ func (s *FederationSuite) TestGroupSyncingOnFederatedCluster(c *check.C) {
                Filters: []arvados.Filter{{
                        Attr:     "owner_uuid",
                        Operator: "=",
-                       Operand:  s.testClusters["z2222"].ClusterID + "-tpzed-000000000000000",
+                       Operand:  s.super.Cluster("z2222").ClusterID + "-tpzed-000000000000000",
                }, {
                        Attr:     "name",
                        Operator: "=",