From b1ed7c643f311605092991e01bcc3437130d6072 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 4 Apr 2022 10:18:07 -0400 Subject: [PATCH] 18699: boot.Supervisor support for multi-cluster config file. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/boot/cert.go | 29 +- lib/boot/cmd.go | 31 +-- lib/boot/helpers.go | 84 ++---- lib/boot/supervisor.go | 380 ++++++++++++++++++--------- lib/controller/integration_test.go | 218 ++++++++------- tools/sync-groups/federation_test.go | 73 +++-- 6 files changed, 441 insertions(+), 374 deletions(-) diff --git a/lib/boot/cert.go b/lib/boot/cert.go index 2b38dab053..916f9f53b2 100644 --- a/lib/boot/cert.go +++ b/lib/boot/cert.go @@ -26,20 +26,8 @@ func (createCertificates) String() string { } func (createCertificates) Run(ctx context.Context, fail func(error), super *Supervisor) error { - var san string - if net.ParseIP(super.ListenHost) != nil { - san += fmt.Sprintf(",IP:%s", super.ListenHost) - } else { - san += fmt.Sprintf(",DNS:%s", super.ListenHost) - } - hostname, err := os.Hostname() - if err != nil { - return fmt.Errorf("hostname: %w", err) - } - san += ",DNS:" + hostname - // Generate root key - err = super.RunProgram(ctx, super.tempdir, runOptions{}, "openssl", "genrsa", "-out", "rootCA.key", "4096") + err := super.RunProgram(ctx, super.tempdir, runOptions{}, "openssl", "genrsa", "-out", "rootCA.key", "4096") if err != nil { return err } @@ -58,7 +46,20 @@ func (createCertificates) Run(ctx context.Context, fail func(error), super *Supe if err != nil { return err } - err = ioutil.WriteFile(filepath.Join(super.tempdir, "server.cfg"), append(defaultconf, []byte(fmt.Sprintf("\n[SAN]\nsubjectAltName=DNS:localhost,DNS:localhost.localdomain%s\n", san))...), 0644) + hostname, err := os.Hostname() + if err != nil { + return fmt.Errorf("hostname: %w", err) + } + san := "DNS:localhost,DNS:localhost.localdomain,DNS:" + hostname + if super.ListenHost == hostname || super.ListenHost == "localhost" { + // already have it + } else if net.ParseIP(super.ListenHost) != nil { + san += fmt.Sprintf(",IP:%s", super.ListenHost) + } else { + san += fmt.Sprintf(",DNS:%s", super.ListenHost) + } + conf := append(defaultconf, []byte(fmt.Sprintf("\n[SAN]\nsubjectAltName=%s\n", san))...) + err = ioutil.WriteFile(filepath.Join(super.tempdir, "server.cfg"), conf, 0644) if err != nil { return err } diff --git a/lib/boot/cmd.go b/lib/boot/cmd.go index 9c3047a7b6..6a32ab142d 100644 --- a/lib/boot/cmd.go +++ b/lib/boot/cmd.go @@ -13,7 +13,6 @@ import ( "time" "git.arvados.org/arvados.git/lib/cmd" - "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/sdk/go/ctxlog" ) @@ -56,17 +55,17 @@ func (bcmd bootCommand) run(ctx context.Context, prog string, args []string, std ctx, cancel := context.WithCancel(ctx) defer cancel() super := &Supervisor{ + Stdin: stdin, Stderr: stderr, logger: ctxlog.FromContext(ctx), } flags := flag.NewFlagSet(prog, flag.ContinueOnError) - loader := config.NewLoader(stdin, super.logger) - loader.SetupFlags(flags) versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0") + flags.StringVar(&super.ConfigPath, "config", "/etc/arvados/config.yml", "arvados config file `path`") flags.StringVar(&super.SourcePath, "source", ".", "arvados source tree `directory`") flags.StringVar(&super.ClusterType, "type", "production", "cluster `type`: development, test, or production") - flags.StringVar(&super.ListenHost, "listen-host", "localhost", "host name or interface address for service listeners") + flags.StringVar(&super.ListenHost, "listen-host", "localhost", "host name or interface address for external services, and internal services whose InternalURLs are not configured") flags.StringVar(&super.ControllerAddr, "controller-address", ":0", "desired controller address, `host:port` or `:port`") flags.StringVar(&super.Workbench2Source, "workbench2-source", "../arvados-workbench2", "path to arvados-workbench2 source tree") flags.BoolVar(&super.NoWorkbench1, "no-workbench1", false, "do not run workbench1") @@ -87,13 +86,7 @@ func (bcmd bootCommand) run(ctx context.Context, prog string, args []string, std return fmt.Errorf("cluster type must be 'development', 'test', or 'production'") } - loader.SkipAPICalls = true - cfg, err := loader.Load() - if err != nil { - return err - } - - super.Start(ctx, cfg, loader.Path) + super.Start(ctx) defer super.Stop() var timer *time.Timer @@ -101,17 +94,21 @@ func (bcmd bootCommand) run(ctx context.Context, prog string, args []string, std timer = time.AfterFunc(*timeout, super.Stop) } - url, ok := super.WaitReady() + ok := super.WaitReady() if timer != nil && !timer.Stop() { return errors.New("boot timed out") } else if !ok { super.logger.Error("boot failed") } else { - // Write controller URL to stdout. Nothing else goes - // to stdout, so this provides an easy way for a - // calling script to discover the controller URL when - // everything is ready. - fmt.Fprintln(stdout, url) + // Write each cluster's controller URL to stdout. + // Nothing else goes to stdout, so this allows a + // calling script to determine when the cluster is + // ready to use, and the controller's host:port (which + // may have been dynamically assigned depending on + // config/options). + for _, cc := range super.Clusters() { + fmt.Fprintln(stdout, cc.Services.Controller.ExternalURL) + } if *shutdown { super.Stop() // Wait for children to exit. Don't report the diff --git a/lib/boot/helpers.go b/lib/boot/helpers.go index 731fc56c83..77036e9340 100644 --- a/lib/boot/helpers.go +++ b/lib/boot/helpers.go @@ -9,73 +9,24 @@ import ( "net/url" "git.arvados.org/arvados.git/lib/controller/rpc" - "git.arvados.org/arvados.git/lib/service" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/auth" - "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/keepclient" "gopkg.in/check.v1" ) -// TestCluster stores a working test cluster data -type TestCluster struct { - Super Supervisor - Config arvados.Config - ControllerURL *url.URL - ClusterID string -} - -type logger struct { - loggerfunc func(...interface{}) -} - -func (l logger) Log(args ...interface{}) { - l.loggerfunc(args) -} - -// NewTestCluster loads the provided configuration, and sets up a test cluster -// ready for being started. -func NewTestCluster(srcPath, clusterID string, cfg *arvados.Config, listenHost string, logWriter func(...interface{})) *TestCluster { - return &TestCluster{ - Super: Supervisor{ - SourcePath: srcPath, - ClusterType: "test", - ListenHost: listenHost, - ControllerAddr: ":0", - OwnTemporaryDatabase: true, - Stderr: &service.LogPrefixer{ - Writer: ctxlog.LogWriter(logWriter), - Prefix: []byte("[" + clusterID + "] ")}, - }, - Config: *cfg, - ClusterID: clusterID, - } -} - -// Start the test cluster. -func (tc *TestCluster) Start() { - tc.Super.Start(context.Background(), &tc.Config, "-") -} - -// WaitReady waits for all components to report healthy, and finishes setting -// up the TestCluster struct. -func (tc *TestCluster) WaitReady() bool { - au, ok := tc.Super.WaitReady() - if !ok { - return ok - } - u := url.URL(*au) - tc.ControllerURL = &u - return ok -} - // ClientsWithToken returns Context, Arvados.Client and keepclient structs // initialized to connect to the cluster with the supplied Arvados token. -func (tc *TestCluster) ClientsWithToken(token string) (context.Context, *arvados.Client, *keepclient.KeepClient) { - cl := tc.Config.Clusters[tc.ClusterID] - ctx := auth.NewContext(context.Background(), auth.NewCredentials(token)) - ac, err := arvados.NewClientFromConfig(&cl) +func (super *Supervisor) ClientsWithToken(clusterID, token string) (context.Context, *arvados.Client, *keepclient.KeepClient) { + cl := super.cluster + if super.children != nil { + cl = super.children[clusterID].cluster + } else if clusterID != cl.ClusterID { + panic("bad clusterID " + clusterID) + } + ctx := auth.NewContext(super.ctx, auth.NewCredentials(token)) + ac, err := arvados.NewClientFromConfig(cl) if err != nil { panic(err) } @@ -92,7 +43,7 @@ func (tc *TestCluster) ClientsWithToken(token string) (context.Context, *arvados // initialize clients with the API token, set up the user and // optionally activate the user. Return client structs for // communicating with the cluster on behalf of the 'example' user. -func (tc *TestCluster) UserClients(rootctx context.Context, c *check.C, conn *rpc.Conn, authEmail string, activate bool) (context.Context, *arvados.Client, *keepclient.KeepClient, arvados.User) { +func (super *Supervisor) UserClients(clusterID string, rootctx context.Context, c *check.C, conn *rpc.Conn, authEmail string, activate bool) (context.Context, *arvados.Client, *keepclient.KeepClient, arvados.User) { login, err := conn.UserSessionCreate(rootctx, rpc.UserSessionCreateOptions{ ReturnTo: ",https://example.com", AuthInfo: rpc.UserSessionAuthInfo{ @@ -107,7 +58,7 @@ func (tc *TestCluster) UserClients(rootctx context.Context, c *check.C, conn *rp c.Assert(err, check.IsNil) userToken := redirURL.Query().Get("api_token") c.Logf("user token: %q", userToken) - ctx, ac, kc := tc.ClientsWithToken(userToken) + ctx, ac, kc := super.ClientsWithToken(clusterID, userToken) user, err := conn.UserGetCurrent(ctx, arvados.GetOptions{}) c.Assert(err, check.IsNil) _, err = conn.UserSetup(rootctx, arvados.UserSetupOptions{UUID: user.UUID}) @@ -127,18 +78,19 @@ func (tc *TestCluster) UserClients(rootctx context.Context, c *check.C, conn *rp // RootClients returns Context, arvados.Client and keepclient structs initialized // to communicate with the cluster as the system root user. -func (tc *TestCluster) RootClients() (context.Context, *arvados.Client, *keepclient.KeepClient) { - return tc.ClientsWithToken(tc.Config.Clusters[tc.ClusterID].SystemRootToken) +func (super *Supervisor) RootClients(clusterID string) (context.Context, *arvados.Client, *keepclient.KeepClient) { + return super.ClientsWithToken(clusterID, super.Cluster(clusterID).SystemRootToken) } // AnonymousClients returns Context, arvados.Client and keepclient structs initialized // to communicate with the cluster as the anonymous user. -func (tc *TestCluster) AnonymousClients() (context.Context, *arvados.Client, *keepclient.KeepClient) { - return tc.ClientsWithToken(tc.Config.Clusters[tc.ClusterID].Users.AnonymousUserToken) +func (super *Supervisor) AnonymousClients(clusterID string) (context.Context, *arvados.Client, *keepclient.KeepClient) { + return super.ClientsWithToken(clusterID, super.Cluster(clusterID).Users.AnonymousUserToken) } // Conn gets rpc connection struct initialized to communicate with the // specified cluster. -func (tc *TestCluster) Conn() *rpc.Conn { - return rpc.NewConn(tc.ClusterID, tc.ControllerURL, true, rpc.PassthroughTokenProvider) +func (super *Supervisor) Conn(clusterID string) *rpc.Conn { + controllerURL := url.URL(super.Cluster(clusterID).Services.Controller.ExternalURL) + return rpc.NewConn(clusterID, &controllerURL, true, rpc.PassthroughTokenProvider) } diff --git a/lib/boot/supervisor.go b/lib/boot/supervisor.go index 7daceccb93..b56e5ac725 100644 --- a/lib/boot/supervisor.go +++ b/lib/boot/supervisor.go @@ -37,25 +37,47 @@ import ( ) type Supervisor struct { - SourcePath string // e.g., /home/username/src/arvados - SourceVersion string // e.g., acbd1324... - ClusterType string // e.g., production - ListenHost string // e.g., localhost - ControllerAddr string // e.g., 127.0.0.1:8000 - Workbench2Source string // e.g., /home/username/src/arvados-workbench2 + // Config file location like "/etc/arvados/config.yml", or "-" + // to read from Stdin (see below). + ConfigPath string + // Literal config file (useful for test suites). If non-empty, + // this is used instead of ConfigPath. + ConfigYAML string + // Path to arvados source tree. Only used for dev/test + // clusters. + SourcePath string + // Version number to build into binaries. Only used for + // dev/test clusters. + SourceVersion string + // "production", "development", or "test". + ClusterType string + // Listening address for external services, and internal + // services whose InternalURLs are not explicitly configured. + // If blank, listen on the configured controller ExternalURL + // host; if that is also blank, listen on all addresses + // (0.0.0.0). + ListenHost string + // Default host:port for controller ExternalURL if not + // explicitly configured in config file. If blank, use a + // random port on ListenHost. + ControllerAddr string + // Path to arvados-workbench2 source tree checkout. + Workbench2Source string NoWorkbench1 bool NoWorkbench2 bool OwnTemporaryDatabase bool + Stdin io.Reader Stderr io.Writer - logger logrus.FieldLogger - cluster *arvados.Cluster + logger logrus.FieldLogger + cluster *arvados.Cluster // nil if this is a multi-cluster supervisor + children map[string]*Supervisor // nil if this is a single-cluster supervisor ctx context.Context cancel context.CancelFunc - done chan struct{} // closed when child procs/services have shut down - err error // error that caused shutdown (valid when done is closed) - healthChecker *health.Aggregator + done chan struct{} // closed when child procs/services have shut down + err error // error that caused shutdown (valid when done is closed) + healthChecker *health.Aggregator // nil if this is a multi-cluster supervisor, or still booting tasksReady map[string]chan bool waitShutdown sync.WaitGroup @@ -66,73 +88,161 @@ type Supervisor struct { environ []string // for child processes } -func (super *Supervisor) Cluster() *arvados.Cluster { return super.cluster } +func (super *Supervisor) Clusters() map[string]*arvados.Cluster { + m := map[string]*arvados.Cluster{} + if super.cluster != nil { + m[super.cluster.ClusterID] = super.cluster + } + for id, super2 := range super.children { + m[id] = super2.Cluster("") + } + return m +} + +func (super *Supervisor) Cluster(id string) *arvados.Cluster { + if super.children != nil { + return super.children[id].Cluster(id) + } else { + return super.cluster + } +} -func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config, cfgPath string) { +func (super *Supervisor) Start(ctx context.Context) { + super.logger = ctxlog.FromContext(ctx) super.ctx, super.cancel = context.WithCancel(ctx) super.done = make(chan struct{}) + sigch := make(chan os.Signal) + signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM) + defer signal.Stop(sigch) go func() { - defer close(super.done) + for sig := range sigch { + super.logger.WithField("signal", sig).Info("caught signal") + if super.err == nil { + super.err = fmt.Errorf("caught signal %s", sig) + } + super.cancel() + } + }() - sigch := make(chan os.Signal) - signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM) - defer signal.Stop(sigch) - go func() { - for sig := range sigch { - super.logger.WithField("signal", sig).Info("caught signal") - if super.err == nil { - super.err = fmt.Errorf("caught signal %s", sig) - } - super.cancel() + hupch := make(chan os.Signal) + signal.Notify(hupch, syscall.SIGHUP) + defer signal.Stop(hupch) + go func() { + for sig := range hupch { + super.logger.WithField("signal", sig).Info("caught signal") + if super.err == nil { + super.err = errNeedConfigReload } - }() + super.cancel() + } + }() + + loaderStdin := super.Stdin + if super.ConfigYAML != "" { + loaderStdin = bytes.NewBufferString(super.ConfigYAML) + } + loader := config.NewLoader(loaderStdin, super.logger) + loader.SkipLegacy = true + loader.SkipAPICalls = true + loader.Path = super.ConfigPath + if super.ConfigYAML != "" { + loader.Path = "-" + } + cfg, err := loader.Load() + if err != nil { + super.err = err + close(super.done) + super.cancel() + return + } + + if super.ConfigPath != "" && super.ConfigPath != "-" && cfg.AutoReloadConfig { + go watchConfig(super.ctx, super.logger, super.ConfigPath, copyConfig(cfg), func() { + if super.err == nil { + super.err = errNeedConfigReload + } + super.cancel() + }) + } - hupch := make(chan os.Signal) - signal.Notify(hupch, syscall.SIGHUP) - defer signal.Stop(hupch) + if len(cfg.Clusters) > 1 { + super.startFederation(cfg) go func() { - for sig := range hupch { - super.logger.WithField("signal", sig).Info("caught signal") + defer super.cancel() + defer close(super.done) + for _, super2 := range super.children { + err := super2.Wait() if super.err == nil { - super.err = errNeedConfigReload + super.err = err } - super.cancel() } }() - - if cfgPath != "" && cfgPath != "-" && cfg.AutoReloadConfig { - go watchConfig(super.ctx, super.logger, cfgPath, copyConfig(cfg), func() { + } else { + go func() { + defer super.cancel() + defer close(super.done) + super.cluster, super.err = cfg.GetCluster("") + if super.err != nil { + return + } + err := super.runCluster() + if err != nil { + super.logger.WithError(err).Info("supervisor shut down") if super.err == nil { - super.err = errNeedConfigReload + super.err = err } - super.cancel() - }) - } - - err := super.run(cfg) - if err != nil { - super.logger.WithError(err).Warn("supervisor shut down") - if super.err == nil { - super.err = err } - } - }() + }() + } } +// Wait returns when all child processes and goroutines have exited. func (super *Supervisor) Wait() error { <-super.done return super.err } -func (super *Supervisor) run(cfg *arvados.Config) error { - defer super.cancel() +// startFederation starts a child Supervisor for each cluster in the +// given config. Each is a copy of the original/parent with the +// original config reduced to a single cluster. +func (super *Supervisor) startFederation(cfg *arvados.Config) { + super.children = map[string]*Supervisor{} + for id, cc := range cfg.Clusters { + super2 := *super + yaml, err := json.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{id: cc}}) + if err != nil { + panic(fmt.Sprintf("json.Marshal partial config: %s", err)) + } + super2.ConfigYAML = string(yaml) + super2.ConfigPath = "-" + super2.children = nil + + if super2.ClusterType == "test" { + super2.Stderr = &service.LogPrefixer{ + Writer: super.Stderr, + Prefix: []byte("[" + id + "] "), + } + } + super2.Start(super.ctx) + super.children[id] = &super2 + } +} +func (super *Supervisor) runCluster() error { cwd, err := os.Getwd() if err != nil { return err } - if !strings.HasPrefix(super.SourcePath, "/") { + if super.ClusterType == "test" && super.SourcePath == "" { + // When invoked by test suite, default to current + // source tree + buf, err := exec.Command("git", "rev-parse", "--show-toplevel").CombinedOutput() + if err != nil { + return fmt.Errorf("git rev-parse: %w", err) + } + super.SourcePath = strings.TrimSuffix(string(buf), "\n") + } else if !strings.HasPrefix(super.SourcePath, "/") { super.SourcePath = filepath.Join(cwd, super.SourcePath) } super.SourcePath, err = filepath.EvalSymlinks(super.SourcePath) @@ -140,6 +250,18 @@ func (super *Supervisor) run(cfg *arvados.Config) error { return err } + if super.ListenHost == "" { + if urlhost := super.cluster.Services.Controller.ExternalURL.Host; urlhost != "" { + if h, _, _ := net.SplitHostPort(urlhost); h != "" { + super.ListenHost = h + } else { + super.ListenHost = urlhost + } + } else { + super.ListenHost = "0.0.0.0" + } + } + // Choose bin and temp dirs: /var/lib/arvados/... in // production, transient tempdir otherwise. if super.ClusterType == "production" { @@ -164,7 +286,7 @@ func (super *Supervisor) run(cfg *arvados.Config) error { // Fill in any missing config keys, and write the resulting // config in the temp dir for child services to use. - err = super.autofillConfig(cfg) + err = super.autofillConfig() if err != nil { return err } @@ -173,7 +295,9 @@ func (super *Supervisor) run(cfg *arvados.Config) error { return err } defer conffile.Close() - err = json.NewEncoder(conffile).Encode(cfg) + err = json.NewEncoder(conffile).Encode(arvados.Config{ + Clusters: map[string]arvados.Cluster{ + super.cluster.ClusterID: *super.cluster}}) if err != nil { return err } @@ -193,10 +317,6 @@ func (super *Supervisor) run(cfg *arvados.Config) error { super.prependEnv("PATH", super.tempdir+"/bin:") } - super.cluster, err = cfg.GetCluster("") - if err != nil { - return err - } // Now that we have the config, replace the bootstrap logger // with a new one according to the logging config. loglevel := super.cluster.SystemLogs.LogLevel @@ -307,36 +427,60 @@ func (super *Supervisor) run(cfg *arvados.Config) error { } func (super *Supervisor) wait(ctx context.Context, tasks ...supervisedTask) error { + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() for _, task := range tasks { ch, ok := super.tasksReady[task.String()] if !ok { return fmt.Errorf("no such task: %s", task) } super.logger.WithField("task", task.String()).Info("waiting") - select { - case <-ch: - super.logger.WithField("task", task.String()).Info("ready") - case <-ctx.Done(): - super.logger.WithField("task", task.String()).Info("task was never ready") - return ctx.Err() + for { + select { + case <-ch: + super.logger.WithField("task", task.String()).Info("ready") + case <-ctx.Done(): + super.logger.WithField("task", task.String()).Info("task was never ready") + return ctx.Err() + case <-ticker.C: + super.logger.WithField("task", task.String()).Info("still waiting...") + continue + } + break } } return nil } +// Stop shuts down all child processes and goroutines, and returns +// when all of them have exited. func (super *Supervisor) Stop() { super.cancel() <-super.done } -func (super *Supervisor) WaitReady() (*arvados.URL, bool) { +// WaitReady waits for the cluster(s) to be ready to handle requests, +// then returns true. If startup fails, it returns false. +func (super *Supervisor) WaitReady() bool { + if super.children != nil { + for id, super2 := range super.children { + super.logger.Infof("waiting for %s to be ready", id) + if !super2.WaitReady() { + super.logger.Infof("%s startup failed", id) + return false + } + super.logger.Infof("%s is ready", id) + } + super.logger.Info("all clusters are ready") + return true + } ticker := time.NewTicker(time.Second) defer ticker.Stop() for waiting := "all"; waiting != ""; { select { case <-ticker.C: case <-super.ctx.Done(): - return nil, false + return false } if super.healthChecker == nil { // not set up yet @@ -358,8 +502,7 @@ func (super *Supervisor) WaitReady() (*arvados.URL, bool) { super.logger.WithField("targets", waiting[1:]).Info("waiting") } } - u := super.cluster.Services.Controller.ExternalURL - return &u, true + return true } func (super *Supervisor) prependEnv(key, prepend string) { @@ -631,11 +774,7 @@ func (super *Supervisor) RunProgram(ctx context.Context, dir string, opts runOpt return nil } -func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { - cluster, err := cfg.GetCluster("") - if err != nil { - return err - } +func (super *Supervisor) autofillConfig() error { usedPort := map[string]bool{} nextPort := func(host string) (string, error) { for { @@ -653,39 +792,39 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { return port, nil } } - if cluster.Services.Controller.ExternalURL.Host == "" { + if super.cluster.Services.Controller.ExternalURL.Host == "" { h, p, err := net.SplitHostPort(super.ControllerAddr) - if err != nil { - return fmt.Errorf("SplitHostPort(ControllerAddr): %w", err) + if err != nil && super.ControllerAddr != "" { + return fmt.Errorf("SplitHostPort(ControllerAddr %q): %w", super.ControllerAddr, err) } if h == "" { h = super.ListenHost } - if p == "0" { + if p == "0" || p == "" { p, err = nextPort(h) if err != nil { return err } } - cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p), Path: "/"} + super.cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p), Path: "/"} } - u := url.URL(cluster.Services.Controller.ExternalURL) + u := url.URL(super.cluster.Services.Controller.ExternalURL) defaultExtHost := u.Hostname() for _, svc := range []*arvados.Service{ - &cluster.Services.Controller, - &cluster.Services.DispatchCloud, - &cluster.Services.GitHTTP, - &cluster.Services.Health, - &cluster.Services.Keepproxy, - &cluster.Services.Keepstore, - &cluster.Services.RailsAPI, - &cluster.Services.WebDAV, - &cluster.Services.WebDAVDownload, - &cluster.Services.Websocket, - &cluster.Services.Workbench1, - &cluster.Services.Workbench2, + &super.cluster.Services.Controller, + &super.cluster.Services.DispatchCloud, + &super.cluster.Services.GitHTTP, + &super.cluster.Services.Health, + &super.cluster.Services.Keepproxy, + &super.cluster.Services.Keepstore, + &super.cluster.Services.RailsAPI, + &super.cluster.Services.WebDAV, + &super.cluster.Services.WebDAVDownload, + &super.cluster.Services.Websocket, + &super.cluster.Services.Workbench1, + &super.cluster.Services.Workbench2, } { - if svc == &cluster.Services.DispatchCloud && super.ClusterType == "test" { + if svc == &super.cluster.Services.DispatchCloud && super.ClusterType == "test" { continue } if svc.ExternalURL.Host == "" { @@ -694,21 +833,21 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { return err } host := net.JoinHostPort(defaultExtHost, port) - if svc == &cluster.Services.Controller || - svc == &cluster.Services.GitHTTP || - svc == &cluster.Services.Health || - svc == &cluster.Services.Keepproxy || - svc == &cluster.Services.WebDAV || - svc == &cluster.Services.WebDAVDownload || - svc == &cluster.Services.Workbench1 || - svc == &cluster.Services.Workbench2 { + if svc == &super.cluster.Services.Controller || + svc == &super.cluster.Services.GitHTTP || + svc == &super.cluster.Services.Health || + svc == &super.cluster.Services.Keepproxy || + svc == &super.cluster.Services.WebDAV || + svc == &super.cluster.Services.WebDAVDownload || + svc == &super.cluster.Services.Workbench1 || + svc == &super.cluster.Services.Workbench2 { svc.ExternalURL = arvados.URL{Scheme: "https", Host: host, Path: "/"} - } else if svc == &cluster.Services.Websocket { + } else if svc == &super.cluster.Services.Websocket { svc.ExternalURL = arvados.URL{Scheme: "wss", Host: host, Path: "/websocket"} } } - if super.NoWorkbench1 && svc == &cluster.Services.Workbench1 || - super.NoWorkbench2 && svc == &cluster.Services.Workbench2 { + if super.NoWorkbench1 && svc == &super.cluster.Services.Workbench1 || + super.NoWorkbench2 && svc == &super.cluster.Services.Workbench2 { // When workbench1 is disabled, it gets an // ExternalURL (so we have a valid listening // port to write in our Nginx config) but no @@ -728,26 +867,26 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { } } if super.ClusterType != "production" { - if cluster.SystemRootToken == "" { - cluster.SystemRootToken = randomHexString(64) + if super.cluster.SystemRootToken == "" { + super.cluster.SystemRootToken = randomHexString(64) } - if cluster.ManagementToken == "" { - cluster.ManagementToken = randomHexString(64) + if super.cluster.ManagementToken == "" { + super.cluster.ManagementToken = randomHexString(64) } - if cluster.Collections.BlobSigningKey == "" { - cluster.Collections.BlobSigningKey = randomHexString(64) + if super.cluster.Collections.BlobSigningKey == "" { + super.cluster.Collections.BlobSigningKey = randomHexString(64) } - if cluster.Users.AnonymousUserToken == "" { - cluster.Users.AnonymousUserToken = randomHexString(64) + if super.cluster.Users.AnonymousUserToken == "" { + super.cluster.Users.AnonymousUserToken = randomHexString(64) } - if cluster.Containers.DispatchPrivateKey == "" { + if super.cluster.Containers.DispatchPrivateKey == "" { buf, err := ioutil.ReadFile(filepath.Join(super.SourcePath, "lib", "dispatchcloud", "test", "sshkey_dispatch")) if err != nil { return err } - cluster.Containers.DispatchPrivateKey = string(buf) + super.cluster.Containers.DispatchPrivateKey = string(buf) } - cluster.TLS.Insecure = true + super.cluster.TLS.Insecure = true } if super.ClusterType == "test" { // Add a second keepstore process. @@ -756,13 +895,13 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { return err } host := net.JoinHostPort(super.ListenHost, port) - cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: host, Path: "/"}] = arvados.ServiceInstance{} + super.cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: host, Path: "/"}] = arvados.ServiceInstance{} // Create a directory-backed volume for each keepstore // process. - cluster.Volumes = map[string]arvados.Volume{} - for url := range cluster.Services.Keepstore.InternalURLs { - volnum := len(cluster.Volumes) + super.cluster.Volumes = map[string]arvados.Volume{} + for url := range super.cluster.Services.Keepstore.InternalURLs { + volnum := len(super.cluster.Volumes) datadir := fmt.Sprintf("%s/keep%d.data", super.tempdir, volnum) if _, err = os.Stat(datadir + "/."); err == nil { } else if !os.IsNotExist(err) { @@ -770,7 +909,7 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { } else if err = os.Mkdir(datadir, 0755); err != nil { return err } - cluster.Volumes[fmt.Sprintf(cluster.ClusterID+"-nyw5e-%015d", volnum)] = arvados.Volume{ + super.cluster.Volumes[fmt.Sprintf(super.cluster.ClusterID+"-nyw5e-%015d", volnum)] = arvados.Volume{ Driver: "Directory", DriverParameters: json.RawMessage(fmt.Sprintf(`{"Root":%q}`, datadir)), AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{ @@ -783,7 +922,7 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { }, } } - cluster.StorageClasses = map[string]arvados.StorageClassConfig{ + super.cluster.StorageClasses = map[string]arvados.StorageClassConfig{ "default": {Default: true}, "foo": {}, "bar": {}, @@ -794,7 +933,7 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { if err != nil { return err } - cluster.PostgreSQL.Connection = arvados.PostgreSQLConnection{ + super.cluster.PostgreSQL.Connection = arvados.PostgreSQLConnection{ "client_encoding": "utf8", "host": "localhost", "port": port, @@ -803,8 +942,6 @@ func (super *Supervisor) autofillConfig(cfg *arvados.Config) error { "password": "insecure_arvados_test", } } - - cfg.Clusters[cluster.ClusterID] = *cluster return nil } @@ -876,6 +1013,7 @@ func availablePort(host string) (string, error) { // Try to connect to addr until it works, then close ch. Give up if // ctx cancels. func waitForConnect(ctx context.Context, addr string) error { + ctxlog.FromContext(ctx).WithField("addr", addr).Info("waitForConnect") dialer := net.Dialer{Timeout: time.Second} for ctx.Err() == nil { conn, err := dialer.DialContext(ctx, "tcp", addr) diff --git a/lib/controller/integration_test.go b/lib/controller/integration_test.go index 50cf89c0d4..44be17c77f 100644 --- a/lib/controller/integration_test.go +++ b/lib/controller/integration_test.go @@ -17,13 +17,12 @@ import ( "net/http" "os" "os/exec" - "path/filepath" "strconv" "strings" "sync" + "time" "git.arvados.org/arvados.git/lib/boot" - "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadostest" "git.arvados.org/arvados.git/sdk/go/ctxlog" @@ -34,13 +33,11 @@ import ( var _ = check.Suite(&IntegrationSuite{}) type IntegrationSuite struct { - testClusters map[string]*boot.TestCluster + super *boot.Supervisor oidcprovider *arvadostest.OIDCProvider } func (s *IntegrationSuite) SetUpSuite(c *check.C) { - cwd, _ := os.Getwd() - s.oidcprovider = arvadostest.NewOIDCProvider(c) s.oidcprovider.AuthEmail = "user@example.com" s.oidcprovider.AuthEmailVerified = true @@ -48,13 +45,8 @@ func (s *IntegrationSuite) SetUpSuite(c *check.C) { s.oidcprovider.ValidClientID = "clientid" s.oidcprovider.ValidClientSecret = "clientsecret" - s.testClusters = map[string]*boot.TestCluster{ - "z1111": nil, - "z2222": nil, - "z3333": nil, - } hostport := map[string]string{} - for id := range s.testClusters { + for _, id := range []string{"z1111", "z2222", "z3333"} { hostport[id] = func() string { // TODO: Instead of expecting random ports on // 127.0.0.11, 22, 33 to be race-safe, try @@ -68,8 +60,9 @@ func (s *IntegrationSuite) SetUpSuite(c *check.C) { return "127.0.0." + id[3:] + ":" + port }() } - for id := range s.testClusters { - yaml := `Clusters: + yaml := "Clusters:\n" + for id := range hostport { + yaml += ` ` + id + `: Services: Controller: @@ -124,37 +117,35 @@ func (s *IntegrationSuite) SetUpSuite(c *check.C) { LoginCluster: z1111 ` } - - loader := config.NewLoader(bytes.NewBufferString(yaml), ctxlog.TestLogger(c)) - loader.Path = "-" - loader.SkipLegacy = true - loader.SkipAPICalls = true - cfg, err := loader.Load() - c.Assert(err, check.IsNil) - tc := boot.NewTestCluster( - filepath.Join(cwd, "..", ".."), - id, cfg, "127.0.0."+id[3:], c.Log) - tc.Super.NoWorkbench1 = true - tc.Super.NoWorkbench2 = true - tc.Start() - s.testClusters[id] = tc } - for _, tc := range s.testClusters { - ok := tc.WaitReady() - c.Assert(ok, check.Equals, true) + s.super = &boot.Supervisor{ + ClusterType: "test", + ConfigYAML: yaml, + Stderr: ctxlog.LogWriter(c.Log), + NoWorkbench1: true, + NoWorkbench2: true, + OwnTemporaryDatabase: true, } + + // Give up if startup takes longer than 3m + timeout := time.AfterFunc(3*time.Minute, s.super.Stop) + defer timeout.Stop() + s.super.Start(context.Background()) + ok := s.super.WaitReady() + c.Assert(ok, check.Equals, true) } func (s *IntegrationSuite) TearDownSuite(c *check.C) { - for _, c := range s.testClusters { - c.Super.Stop() + if s.super != nil { + s.super.Stop() + s.super.Wait() } } func (s *IntegrationSuite) TestDefaultStorageClassesOnCollections(c *check.C) { - conn := s.testClusters["z1111"].Conn() - rootctx, _, _ := s.testClusters["z1111"].RootClients() - userctx, _, kc, _ := s.testClusters["z1111"].UserClients(rootctx, c, conn, s.oidcprovider.AuthEmail, true) + conn := s.super.Conn("z1111") + rootctx, _, _ := s.super.RootClients("z1111") + userctx, _, kc, _ := s.super.UserClients("z1111", rootctx, c, conn, s.oidcprovider.AuthEmail, true) c.Assert(len(kc.DefaultStorageClasses) > 0, check.Equals, true) coll, err := conn.CollectionCreate(userctx, arvados.CreateOptions{}) c.Assert(err, check.IsNil) @@ -162,10 +153,10 @@ func (s *IntegrationSuite) TestDefaultStorageClassesOnCollections(c *check.C) { } func (s *IntegrationSuite) TestGetCollectionByPDH(c *check.C) { - conn1 := s.testClusters["z1111"].Conn() - rootctx1, _, _ := s.testClusters["z1111"].RootClients() - conn3 := s.testClusters["z3333"].Conn() - userctx1, ac1, kc1, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) + conn1 := s.super.Conn("z1111") + rootctx1, _, _ := s.super.RootClients("z1111") + conn3 := s.super.Conn("z3333") + userctx1, ac1, kc1, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) // Create the collection to find its PDH (but don't save it // anywhere yet) @@ -201,11 +192,11 @@ func (s *IntegrationSuite) TestGetCollectionByPDH(c *check.C) { // Tests bug #18004 func (s *IntegrationSuite) TestRemoteUserAndTokenCacheRace(c *check.C) { - conn1 := s.testClusters["z1111"].Conn() - rootctx1, _, _ := s.testClusters["z1111"].RootClients() - rootctx2, _, _ := s.testClusters["z2222"].RootClients() - conn2 := s.testClusters["z2222"].Conn() - userctx1, _, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user2@example.com", true) + conn1 := s.super.Conn("z1111") + rootctx1, _, _ := s.super.RootClients("z1111") + rootctx2, _, _ := s.super.RootClients("z2222") + conn2 := s.super.Conn("z2222") + userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, "user2@example.com", true) var wg1, wg2 sync.WaitGroup creqs := 100 @@ -250,13 +241,13 @@ func (s *IntegrationSuite) TestS3WithFederatedToken(c *check.C) { testText := "IntegrationSuite.TestS3WithFederatedToken" - conn1 := s.testClusters["z1111"].Conn() - rootctx1, _, _ := s.testClusters["z1111"].RootClients() - userctx1, ac1, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) - conn3 := s.testClusters["z3333"].Conn() + conn1 := s.super.Conn("z1111") + rootctx1, _, _ := s.super.RootClients("z1111") + userctx1, ac1, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) + conn3 := s.super.Conn("z3333") createColl := func(clusterID string) arvados.Collection { - _, ac, kc := s.testClusters[clusterID].ClientsWithToken(ac1.AuthToken) + _, ac, kc := s.super.ClientsWithToken(clusterID, ac1.AuthToken) var coll arvados.Collection fs, err := coll.FileSystem(ac, kc) c.Assert(err, check.IsNil) @@ -268,7 +259,7 @@ func (s *IntegrationSuite) TestS3WithFederatedToken(c *check.C) { c.Assert(err, check.IsNil) mtxt, err := fs.MarshalManifest(".") c.Assert(err, check.IsNil) - coll, err = s.testClusters[clusterID].Conn().CollectionCreate(userctx1, arvados.CreateOptions{Attrs: map[string]interface{}{ + coll, err = s.super.Conn(clusterID).CollectionCreate(userctx1, arvados.CreateOptions{Attrs: map[string]interface{}{ "manifest_text": mtxt, }}) c.Assert(err, check.IsNil) @@ -316,10 +307,10 @@ func (s *IntegrationSuite) TestS3WithFederatedToken(c *check.C) { } func (s *IntegrationSuite) TestGetCollectionAsAnonymous(c *check.C) { - conn1 := s.testClusters["z1111"].Conn() - conn3 := s.testClusters["z3333"].Conn() - rootctx1, rootac1, rootkc1 := s.testClusters["z1111"].RootClients() - anonctx3, anonac3, _ := s.testClusters["z3333"].AnonymousClients() + conn1 := s.super.Conn("z1111") + conn3 := s.super.Conn("z3333") + rootctx1, rootac1, rootkc1 := s.super.RootClients("z1111") + anonctx3, anonac3, _ := s.super.AnonymousClients("z3333") // Make sure anonymous token was set c.Assert(anonac3.AuthToken, check.Not(check.Equals), "") @@ -368,7 +359,7 @@ func (s *IntegrationSuite) TestGetCollectionAsAnonymous(c *check.C) { c.Check(err, check.IsNil) // Make a v2 token of the z3 anonymous user, and use it on z1 - _, anonac1, _ := s.testClusters["z1111"].ClientsWithToken(outAuth.TokenV2()) + _, anonac1, _ := s.super.ClientsWithToken("z1111", outAuth.TokenV2()) outUser2, err := anonac1.CurrentUser() c.Check(err, check.IsNil) // z3 anonymous user will be mapped to the z1 anonymous user @@ -394,14 +385,13 @@ func (s *IntegrationSuite) TestGetCollectionAsAnonymous(c *check.C) { // the z3333 anonymous user token should not prohibit the request from being // forwarded. func (s *IntegrationSuite) TestForwardAnonymousTokenToLoginCluster(c *check.C) { - conn1 := s.testClusters["z1111"].Conn() - s.testClusters["z3333"].Conn() + conn1 := s.super.Conn("z1111") - rootctx1, _, _ := s.testClusters["z1111"].RootClients() - _, anonac3, _ := s.testClusters["z3333"].AnonymousClients() + rootctx1, _, _ := s.super.RootClients("z1111") + _, anonac3, _ := s.super.AnonymousClients("z3333") // Make a user connection to z3333 (using a z1111 user, because that's the login cluster) - _, userac1, _, _ := s.testClusters["z3333"].UserClients(rootctx1, c, conn1, "user@example.com", true) + _, userac1, _, _ := s.super.UserClients("z3333", rootctx1, c, conn1, "user@example.com", true) // Get the anonymous user token for z3333 var anon3Auth arvados.APIClientAuthorization @@ -433,14 +423,14 @@ func (s *IntegrationSuite) TestForwardAnonymousTokenToLoginCluster(c *check.C) { // Get a token from the login cluster (z1111), use it to submit a // container request on z2222. func (s *IntegrationSuite) TestCreateContainerRequestWithFedToken(c *check.C) { - conn1 := s.testClusters["z1111"].Conn() - rootctx1, _, _ := s.testClusters["z1111"].RootClients() - _, ac1, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) + conn1 := s.super.Conn("z1111") + rootctx1, _, _ := s.super.RootClients("z1111") + _, ac1, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) // Use ac2 to get the discovery doc with a blank token, so the // SDK doesn't magically pass the z1111 token to z2222 before // we're ready to start our test. - _, ac2, _ := s.testClusters["z2222"].ClientsWithToken("") + _, ac2, _ := s.super.ClientsWithToken("z2222", "") var dd map[string]interface{} err := ac2.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil) c.Assert(err, check.IsNil) @@ -502,9 +492,9 @@ func (s *IntegrationSuite) TestCreateContainerRequestWithFedToken(c *check.C) { } func (s *IntegrationSuite) TestCreateContainerRequestWithBadToken(c *check.C) { - conn1 := s.testClusters["z1111"].Conn() - rootctx1, _, _ := s.testClusters["z1111"].RootClients() - _, ac1, _, au := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user@example.com", true) + conn1 := s.super.Conn("z1111") + rootctx1, _, _ := s.super.RootClients("z1111") + _, ac1, _, au := s.super.UserClients("z1111", rootctx1, c, conn1, "user@example.com", true) tests := []struct { name string @@ -539,9 +529,9 @@ func (s *IntegrationSuite) TestCreateContainerRequestWithBadToken(c *check.C) { } func (s *IntegrationSuite) TestRequestIDHeader(c *check.C) { - conn1 := s.testClusters["z1111"].Conn() - rootctx1, _, _ := s.testClusters["z1111"].RootClients() - userctx1, ac1, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user@example.com", true) + conn1 := s.super.Conn("z1111") + rootctx1, _, _ := s.super.RootClients("z1111") + userctx1, ac1, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, "user@example.com", true) coll, err := conn1.CollectionCreate(userctx1, arvados.CreateOptions{}) c.Check(err, check.IsNil) @@ -613,7 +603,7 @@ func (s *IntegrationSuite) TestRequestIDHeader(c *check.C) { // to test tokens that are secret, so there is no API response that will give them back func (s *IntegrationSuite) dbConn(c *check.C, clusterID string) (*sql.DB, *sql.Conn) { ctx := context.Background() - db, err := sql.Open("postgres", s.testClusters[clusterID].Super.Cluster().PostgreSQL.Connection.String()) + db, err := sql.Open("postgres", s.super.Cluster(clusterID).PostgreSQL.Connection.String()) c.Assert(err, check.IsNil) conn, err := db.Conn(ctx) @@ -633,9 +623,9 @@ func (s *IntegrationSuite) TestRuntimeTokenInCR(c *check.C) { db, dbconn := s.dbConn(c, "z1111") defer db.Close() defer dbconn.Close() - conn1 := s.testClusters["z1111"].Conn() - rootctx1, _, _ := s.testClusters["z1111"].RootClients() - userctx1, ac1, _, au := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user@example.com", true) + conn1 := s.super.Conn("z1111") + rootctx1, _, _ := s.super.RootClients("z1111") + userctx1, ac1, _, au := s.super.UserClients("z1111", rootctx1, c, conn1, "user@example.com", true) tests := []struct { name string @@ -685,9 +675,9 @@ func (s *IntegrationSuite) TestRuntimeTokenInCR(c *check.C) { // one cluster with another cluster as the destination // and check the tokens are being handled properly func (s *IntegrationSuite) TestIntermediateCluster(c *check.C) { - conn1 := s.testClusters["z1111"].Conn() - rootctx1, _, _ := s.testClusters["z1111"].RootClients() - uctx1, ac1, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user@example.com", true) + conn1 := s.super.Conn("z1111") + rootctx1, _, _ := s.super.RootClients("z1111") + uctx1, ac1, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, "user@example.com", true) tests := []struct { name string @@ -717,20 +707,20 @@ func (s *IntegrationSuite) TestIntermediateCluster(c *check.C) { // Test for #17785 func (s *IntegrationSuite) TestFederatedApiClientAuthHandling(c *check.C) { - rootctx1, rootclnt1, _ := s.testClusters["z1111"].RootClients() - conn1 := s.testClusters["z1111"].Conn() + rootctx1, rootclnt1, _ := s.super.RootClients("z1111") + conn1 := s.super.Conn("z1111") // Make sure LoginCluster is properly configured for _, cls := range []string{"z1111", "z3333"} { c.Check( - s.testClusters[cls].Config.Clusters[cls].Login.LoginCluster, + s.super.Cluster(cls).Login.LoginCluster, check.Equals, "z1111", check.Commentf("incorrect LoginCluster config on cluster %q", cls)) } // Get user's UUID & attempt to create a token for it on the remote cluster - _, _, _, user := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, + _, _, _, user := s.super.UserClients("z1111", rootctx1, c, conn1, "user@example.com", true) - _, rootclnt3, _ := s.testClusters["z3333"].ClientsWithToken(rootclnt1.AuthToken) + _, rootclnt3, _ := s.super.ClientsWithToken("z3333", rootclnt1.AuthToken) var resp arvados.APIClientAuthorization err := rootclnt3.RequestAndDecode( &resp, "POST", "arvados/v1/api_client_authorizations", nil, @@ -749,7 +739,7 @@ func (s *IntegrationSuite) TestFederatedApiClientAuthHandling(c *check.C) { c.Assert(strings.HasPrefix(newTok, "v2/z1111-gj3su-"), check.Equals, true) // Confirm the token works and is from the correct user - _, rootclnt3bis, _ := s.testClusters["z3333"].ClientsWithToken(newTok) + _, rootclnt3bis, _ := s.super.ClientsWithToken("z3333", newTok) var curUser arvados.User err = rootclnt3bis.RequestAndDecode( &curUser, "GET", "arvados/v1/users/current", nil, nil, @@ -758,7 +748,7 @@ func (s *IntegrationSuite) TestFederatedApiClientAuthHandling(c *check.C) { c.Assert(curUser.UUID, check.Equals, user.UUID) // Request the ApiClientAuthorization list using the new token - _, userClient, _ := s.testClusters["z3333"].ClientsWithToken(newTok) + _, userClient, _ := s.super.ClientsWithToken("z3333", newTok) var acaLst arvados.APIClientAuthorizationList err = userClient.RequestAndDecode( &acaLst, "GET", "arvados/v1/api_client_authorizations", nil, nil, @@ -768,15 +758,15 @@ func (s *IntegrationSuite) TestFederatedApiClientAuthHandling(c *check.C) { // Test for bug #18076 func (s *IntegrationSuite) TestStaleCachedUserRecord(c *check.C) { - rootctx1, _, _ := s.testClusters["z1111"].RootClients() - _, rootclnt3, _ := s.testClusters["z3333"].RootClients() - conn1 := s.testClusters["z1111"].Conn() - conn3 := s.testClusters["z3333"].Conn() + rootctx1, _, _ := s.super.RootClients("z1111") + _, rootclnt3, _ := s.super.RootClients("z3333") + conn1 := s.super.Conn("z1111") + conn3 := s.super.Conn("z3333") // Make sure LoginCluster is properly configured for _, cls := range []string{"z1111", "z3333"} { c.Check( - s.testClusters[cls].Config.Clusters[cls].Login.LoginCluster, + s.super.Cluster(cls).Login.LoginCluster, check.Equals, "z1111", check.Commentf("incorrect LoginCluster config on cluster %q", cls)) } @@ -792,7 +782,7 @@ func (s *IntegrationSuite) TestStaleCachedUserRecord(c *check.C) { // Create some users, request them on the federated cluster so they're cached. var users []arvados.User for userNr := 0; userNr < 2; userNr++ { - _, _, _, user := s.testClusters["z1111"].UserClients( + _, _, _, user := s.super.UserClients("z1111", rootctx1, c, conn1, @@ -871,15 +861,15 @@ func (s *IntegrationSuite) TestStaleCachedUserRecord(c *check.C) { // Test for bug #16263 func (s *IntegrationSuite) TestListUsers(c *check.C) { - rootctx1, _, _ := s.testClusters["z1111"].RootClients() - conn1 := s.testClusters["z1111"].Conn() - conn3 := s.testClusters["z3333"].Conn() - userctx1, _, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) + rootctx1, _, _ := s.super.RootClients("z1111") + conn1 := s.super.Conn("z1111") + conn3 := s.super.Conn("z3333") + userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) // Make sure LoginCluster is properly configured - for cls := range s.testClusters { + for _, cls := range []string{"z1111", "z2222", "z3333"} { c.Check( - s.testClusters[cls].Config.Clusters[cls].Login.LoginCluster, + s.super.Cluster(cls).Login.LoginCluster, check.Equals, "z1111", check.Commentf("incorrect LoginCluster config on cluster %q", cls)) } @@ -939,12 +929,12 @@ func (s *IntegrationSuite) TestListUsers(c *check.C) { } func (s *IntegrationSuite) TestSetupUserWithVM(c *check.C) { - conn1 := s.testClusters["z1111"].Conn() - conn3 := s.testClusters["z3333"].Conn() - rootctx1, rootac1, _ := s.testClusters["z1111"].RootClients() + conn1 := s.super.Conn("z1111") + conn3 := s.super.Conn("z3333") + rootctx1, rootac1, _ := s.super.RootClients("z1111") // Create user on LoginCluster z1111 - _, _, _, user := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) + _, _, _, user := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) // Make a new root token (because rootClients() uses SystemRootToken) var outAuth arvados.APIClientAuthorization @@ -952,7 +942,7 @@ func (s *IntegrationSuite) TestSetupUserWithVM(c *check.C) { c.Check(err, check.IsNil) // Make a v2 root token to communicate with z3333 - rootctx3, rootac3, _ := s.testClusters["z3333"].ClientsWithToken(outAuth.TokenV2()) + rootctx3, rootac3, _ := s.super.ClientsWithToken("z3333", outAuth.TokenV2()) // Create VM on z3333 var outVM arvados.VirtualMachine @@ -1003,9 +993,9 @@ func (s *IntegrationSuite) TestSetupUserWithVM(c *check.C) { } func (s *IntegrationSuite) TestOIDCAccessTokenAuth(c *check.C) { - conn1 := s.testClusters["z1111"].Conn() - rootctx1, _, _ := s.testClusters["z1111"].RootClients() - s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) + conn1 := s.super.Conn("z1111") + rootctx1, _, _ := s.super.RootClients("z1111") + s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) accesstoken := s.oidcprovider.ValidAccessToken() @@ -1017,8 +1007,8 @@ func (s *IntegrationSuite) TestOIDCAccessTokenAuth(c *check.C) { { c.Logf("save collection to %s", clusterID) - conn := s.testClusters[clusterID].Conn() - ctx, ac, kc := s.testClusters[clusterID].ClientsWithToken(accesstoken) + conn := s.super.Conn(clusterID) + ctx, ac, kc := s.super.ClientsWithToken(clusterID, accesstoken) fs, err := coll.FileSystem(ac, kc) c.Assert(err, check.IsNil) @@ -1042,8 +1032,8 @@ func (s *IntegrationSuite) TestOIDCAccessTokenAuth(c *check.C) { for _, readClusterID := range []string{"z1111", "z2222", "z3333"} { c.Logf("retrieve %s from %s", coll.UUID, readClusterID) - conn := s.testClusters[readClusterID].Conn() - ctx, ac, kc := s.testClusters[readClusterID].ClientsWithToken(accesstoken) + conn := s.super.Conn(readClusterID) + ctx, ac, kc := s.super.ClientsWithToken(readClusterID, accesstoken) user, err := conn.UserGetCurrent(ctx, arvados.GetOptions{}) c.Assert(err, check.IsNil) @@ -1071,11 +1061,11 @@ func (s *IntegrationSuite) TestForwardRuntimeTokenToLoginCluster(c *check.C) { db3, db3conn := s.dbConn(c, "z3333") defer db3.Close() defer db3conn.Close() - rootctx1, _, _ := s.testClusters["z1111"].RootClients() - rootctx3, _, _ := s.testClusters["z3333"].RootClients() - conn1 := s.testClusters["z1111"].Conn() - conn3 := s.testClusters["z3333"].Conn() - userctx1, _, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, "user@example.com", true) + rootctx1, _, _ := s.super.RootClients("z1111") + rootctx3, _, _ := s.super.RootClients("z3333") + conn1 := s.super.Conn("z1111") + conn3 := s.super.Conn("z3333") + userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, "user@example.com", true) user1, err := conn1.UserGetCurrent(userctx1, arvados.GetOptions{}) c.Assert(err, check.IsNil) @@ -1113,7 +1103,7 @@ func (s *IntegrationSuite) TestForwardRuntimeTokenToLoginCluster(c *check.C) { row.Scan(&val) c.Assert(val.Valid, check.Equals, true) runtimeToken := "v2/" + ctr.AuthUUID + "/" + val.String - ctrctx, _, _ := s.testClusters["z3333"].ClientsWithToken(runtimeToken) + ctrctx, _, _ := s.super.ClientsWithToken("z3333", runtimeToken) c.Logf("container runtime token %+v", runtimeToken) _, err = conn3.UserGet(ctrctx, arvados.GetOptions{UUID: user1.UUID}) diff --git a/tools/sync-groups/federation_test.go b/tools/sync-groups/federation_test.go index d5fed3e29f..71c119156a 100644 --- a/tools/sync-groups/federation_test.go +++ b/tools/sync-groups/federation_test.go @@ -5,13 +5,12 @@ package main import ( - "bytes" + "context" "net" "os" - "path/filepath" + "time" "git.arvados.org/arvados.git/lib/boot" - "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadostest" "git.arvados.org/arvados.git/sdk/go/ctxlog" @@ -23,7 +22,7 @@ var _ = check.Suite(&FederationSuite{}) var origAPIHost, origAPIToken string type FederationSuite struct { - testClusters map[string]*boot.TestCluster + super *boot.Supervisor oidcprovider *arvadostest.OIDCProvider } @@ -31,8 +30,6 @@ func (s *FederationSuite) SetUpSuite(c *check.C) { origAPIHost = os.Getenv("ARVADOS_API_HOST") origAPIToken = os.Getenv("ARVADOS_API_TOKEN") - cwd, _ := os.Getwd() - s.oidcprovider = arvadostest.NewOIDCProvider(c) s.oidcprovider.AuthEmail = "user@example.com" s.oidcprovider.AuthEmailVerified = true @@ -40,12 +37,8 @@ func (s *FederationSuite) SetUpSuite(c *check.C) { s.oidcprovider.ValidClientID = "clientid" s.oidcprovider.ValidClientSecret = "clientsecret" - s.testClusters = map[string]*boot.TestCluster{ - "z1111": nil, - "z2222": nil, - } hostport := map[string]string{} - for id := range s.testClusters { + for _, id := range []string{"z1111", "z2222"} { hostport[id] = func() string { // TODO: Instead of expecting random ports on // 127.0.0.11, 22 to be race-safe, try @@ -59,8 +52,9 @@ func (s *FederationSuite) SetUpSuite(c *check.C) { return "127.0.0." + id[3:] + ":" + port }() } - for id := range s.testClusters { - yaml := `Clusters: + yaml := "Clusters:\n" + for id := range hostport { + yaml += ` ` + id + `: Services: Controller: @@ -104,30 +98,27 @@ func (s *FederationSuite) SetUpSuite(c *check.C) { LoginCluster: z1111 ` } - - loader := config.NewLoader(bytes.NewBufferString(yaml), ctxlog.TestLogger(c)) - loader.Path = "-" - loader.SkipLegacy = true - loader.SkipAPICalls = true - cfg, err := loader.Load() - c.Assert(err, check.IsNil) - tc := boot.NewTestCluster( - filepath.Join(cwd, "..", ".."), - id, cfg, "127.0.0."+id[3:], c.Log) - tc.Super.NoWorkbench1 = true - tc.Super.NoWorkbench2 = true - tc.Start() - s.testClusters[id] = tc } - for _, tc := range s.testClusters { - ok := tc.WaitReady() - c.Assert(ok, check.Equals, true) + s.super = &boot.Supervisor{ + ClusterType: "test", + ConfigYAML: yaml, + Stderr: ctxlog.LogWriter(c.Log), + NoWorkbench1: true, + NoWorkbench2: true, + OwnTemporaryDatabase: true, } + // Give up if startup takes longer than 3m + timeout := time.AfterFunc(3*time.Minute, s.super.Stop) + defer timeout.Stop() + s.super.Start(context.Background()) + ok := s.super.WaitReady() + c.Assert(ok, check.Equals, true) + // Activate user, make it admin. - conn1 := s.testClusters["z1111"].Conn() - rootctx1, _, _ := s.testClusters["z1111"].RootClients() - userctx1, _, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) + conn1 := s.super.Conn("z1111") + rootctx1, _, _ := s.super.RootClients("z1111") + userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) user1, err := conn1.UserGetCurrent(userctx1, arvados.GetOptions{}) c.Assert(err, check.IsNil) c.Assert(user1.IsAdmin, check.Equals, false) @@ -142,25 +133,23 @@ func (s *FederationSuite) SetUpSuite(c *check.C) { } func (s *FederationSuite) TearDownSuite(c *check.C) { - for _, c := range s.testClusters { - c.Super.Stop() - } + s.super.Stop() _ = os.Setenv("ARVADOS_API_HOST", origAPIHost) _ = os.Setenv("ARVADOS_API_TOKEN", origAPIToken) } func (s *FederationSuite) TestGroupSyncingOnFederatedCluster(c *check.C) { // Get admin user's V2 token - conn1 := s.testClusters["z1111"].Conn() - rootctx1, _, _ := s.testClusters["z1111"].RootClients() - userctx1, _, _, _ := s.testClusters["z1111"].UserClients(rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) + conn1 := s.super.Conn("z1111") + rootctx1, _, _ := s.super.RootClients("z1111") + userctx1, _, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) user1Auth, err := conn1.APIClientAuthorizationCurrent(userctx1, arvados.GetOptions{}) c.Check(err, check.IsNil) userV2Token := user1Auth.TokenV2() // Get federated admin clients on z2222 to set up environment - conn2 := s.testClusters["z2222"].Conn() - userctx2, userac2, _ := s.testClusters["z2222"].ClientsWithToken(userV2Token) + conn2 := s.super.Conn("z2222") + userctx2, userac2, _ := s.super.ClientsWithToken("z2222", userV2Token) user2, err := conn2.UserGetCurrent(userctx2, arvados.GetOptions{}) c.Check(err, check.IsNil) c.Check(user2.IsAdmin, check.Equals, true) @@ -177,7 +166,7 @@ func (s *FederationSuite) TestGroupSyncingOnFederatedCluster(c *check.C) { Filters: []arvados.Filter{{ Attr: "owner_uuid", Operator: "=", - Operand: s.testClusters["z2222"].ClusterID + "-tpzed-000000000000000", + Operand: s.super.Cluster("z2222").ClusterID + "-tpzed-000000000000000", }, { Attr: "name", Operator: "=", -- 2.30.2