18071: Use dblock to prevent multiple dispatchers from competing.
authorTom Clegg <tom@curii.com>
Fri, 28 Oct 2022 14:35:27 +0000 (10:35 -0400)
committerTom Clegg <tom@curii.com>
Fri, 28 Oct 2022 14:35:27 +0000 (10:35 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/controller/dblock/dblock.go
lib/controller/federation.go
lib/controller/handler.go
lib/controller/handler_test.go
lib/controller/trash.go
lib/ctrlctx/db.go
lib/dispatchcloud/dispatcher.go
lib/dispatchcloud/dispatcher_test.go
lib/lsf/dispatch.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go

index 6f5db10066db100063f2b60682144eef1e35a2b5..ad2733abfa36df82c72c4aa3c7a6c090c6496efb 100644 (file)
@@ -21,6 +21,7 @@ var (
        ContainerLogSweep  = &DBLocker{key: 10002}
        KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
        KeepBalanceActive  = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
        ContainerLogSweep  = &DBLocker{key: 10002}
        KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
        KeepBalanceActive  = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
+       Dispatch           = &DBLocker{key: 10005} // any dispatcher running
        retryDelay         = 5 * time.Second
 )
 
        retryDelay         = 5 * time.Second
 )
 
index e7d6e29b88c1f683f981a1ee5df2b53cf7c862af..93b8315a63be588a0b3e2e1b3182337e68defeff 100644 (file)
@@ -142,7 +142,7 @@ type CurrentUser struct {
 // non-nil, true, nil -- if the token is valid
 func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUser, bool, error) {
        user := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: token}}
 // non-nil, true, nil -- if the token is valid
 func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUser, bool, error) {
        user := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: token}}
-       db, err := h.db(req.Context())
+       db, err := h.dbConnector.GetDB(req.Context())
        if err != nil {
                ctxlog.FromContext(req.Context()).WithError(err).Debugf("validateAPItoken(%s): database error", token)
                return nil, false, err
        if err != nil {
                ctxlog.FromContext(req.Context()).WithError(err).Debugf("validateAPItoken(%s): database error", token)
                return nil, false, err
@@ -179,7 +179,7 @@ func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUse
 }
 
 func (h *Handler) createAPItoken(req *http.Request, userUUID string, scopes []string) (*arvados.APIClientAuthorization, error) {
 }
 
 func (h *Handler) createAPItoken(req *http.Request, userUUID string, scopes []string) (*arvados.APIClientAuthorization, error) {
-       db, err := h.db(req.Context())
+       db, err := h.dbConnector.GetDB(req.Context())
        if err != nil {
                return nil, err
        }
        if err != nil {
                return nil, err
        }
index e1392bef92652bfbf8a61155a062752608809495..4c6fca7f77276c3981c591d18429d8520d3e76b7 100644 (file)
@@ -6,7 +6,6 @@ package controller
 
 import (
        "context"
 
 import (
        "context"
-       "errors"
        "fmt"
        "net/http"
        "net/http/httptest"
        "fmt"
        "net/http"
        "net/http/httptest"
@@ -21,10 +20,8 @@ import (
        "git.arvados.org/arvados.git/lib/controller/router"
        "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/lib/controller/router"
        "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/health"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
        "git.arvados.org/arvados.git/sdk/go/health"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
-       "github.com/jmoiron/sqlx"
 
        // sqlx needs lib/pq to talk to PostgreSQL
        _ "github.com/lib/pq"
 
        // sqlx needs lib/pq to talk to PostgreSQL
        _ "github.com/lib/pq"
@@ -40,8 +37,7 @@ type Handler struct {
        proxy          *proxy
        secureClient   *http.Client
        insecureClient *http.Client
        proxy          *proxy
        secureClient   *http.Client
        insecureClient *http.Client
-       pgdb           *sqlx.DB
-       pgdbMtx        sync.Mutex
+       dbConnector    ctrlctx.DBConnector
 }
 
 func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 }
 
 func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@@ -65,7 +61,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 
 func (h *Handler) CheckHealth() error {
        h.setupOnce.Do(h.setup)
 
 func (h *Handler) CheckHealth() error {
        h.setupOnce.Do(h.setup)
-       _, err := h.db(context.TODO())
+       _, err := h.dbConnector.GetDB(context.TODO())
        if err != nil {
                return err
        }
        if err != nil {
                return err
        }
@@ -97,17 +93,18 @@ func (h *Handler) setup() {
        mux := http.NewServeMux()
        healthFuncs := make(map[string]health.Func)
 
        mux := http.NewServeMux()
        healthFuncs := make(map[string]health.Func)
 
-       oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.db)
+       h.dbConnector = ctrlctx.DBConnector{PostgreSQL: h.Cluster.PostgreSQL}
+       oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.dbConnector.GetDB)
        h.federation = federation.New(h.Cluster, &healthFuncs)
        rtr := router.New(h.federation, router.Config{
                MaxRequestSize: h.Cluster.API.MaxRequestSize,
                WrapCalls: api.ComposeWrappers(
        h.federation = federation.New(h.Cluster, &healthFuncs)
        rtr := router.New(h.federation, router.Config{
                MaxRequestSize: h.Cluster.API.MaxRequestSize,
                WrapCalls: api.ComposeWrappers(
-                       ctrlctx.WrapCallsInTransactions(h.db),
+                       ctrlctx.WrapCallsInTransactions(h.dbConnector.GetDB),
                        oidcAuthorizer.WrapCalls,
                        ctrlctx.WrapCallsWithAuth(h.Cluster)),
        })
 
                        oidcAuthorizer.WrapCalls,
                        ctrlctx.WrapCallsWithAuth(h.Cluster)),
        })
 
-       healthRoutes := health.Routes{"ping": func() error { _, err := h.db(context.TODO()); return err }}
+       healthRoutes := health.Routes{"ping": func() error { _, err := h.dbConnector.GetDB(context.TODO()); return err }}
        for name, f := range healthFuncs {
                healthRoutes[name] = f
        }
        for name, f := range healthFuncs {
                healthRoutes[name] = f
        }
@@ -158,31 +155,6 @@ func (h *Handler) setup() {
        go h.containerLogSweepWorker()
 }
 
        go h.containerLogSweepWorker()
 }
 
-var errDBConnection = errors.New("database connection error")
-
-func (h *Handler) db(ctx context.Context) (*sqlx.DB, error) {
-       h.pgdbMtx.Lock()
-       defer h.pgdbMtx.Unlock()
-       if h.pgdb != nil {
-               return h.pgdb, nil
-       }
-
-       db, err := sqlx.Open("postgres", h.Cluster.PostgreSQL.Connection.String())
-       if err != nil {
-               ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed")
-               return nil, errDBConnection
-       }
-       if p := h.Cluster.PostgreSQL.ConnectionPool; p > 0 {
-               db.SetMaxOpenConns(p)
-       }
-       if err := db.Ping(); err != nil {
-               ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect succeeded but ping failed")
-               return nil, errDBConnection
-       }
-       h.pgdb = db
-       return db, nil
-}
-
 type middlewareFunc func(http.ResponseWriter, *http.Request, http.Handler)
 
 func prepend(next http.Handler, middleware middlewareFunc) http.Handler {
 type middlewareFunc func(http.ResponseWriter, *http.Request, http.Handler)
 
 func prepend(next http.Handler, middleware middlewareFunc) http.Handler {
index 0ffe0255f5c59ca6ae40930bd994badf0672d05e..c9999fa28c38d6d02645a2afa4d5be94f8fe75a8 100644 (file)
@@ -477,7 +477,7 @@ func (s *HandlerSuite) TestTrashSweep(c *check.C) {
        coll, err := s.handler.federation.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{"name": "test trash sweep"}, EnsureUniqueName: true})
        c.Assert(err, check.IsNil)
        defer s.handler.federation.CollectionDelete(ctx, arvados.DeleteOptions{UUID: coll.UUID})
        coll, err := s.handler.federation.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{"name": "test trash sweep"}, EnsureUniqueName: true})
        c.Assert(err, check.IsNil)
        defer s.handler.federation.CollectionDelete(ctx, arvados.DeleteOptions{UUID: coll.UUID})
-       db, err := s.handler.db(s.ctx)
+       db, err := s.handler.dbConnector.GetDB(s.ctx)
        c.Assert(err, check.IsNil)
        _, err = db.ExecContext(s.ctx, `update collections set trash_at = $1, delete_at = $2 where uuid = $3`, time.Now().UTC().Add(time.Second/10), time.Now().UTC().Add(time.Hour), coll.UUID)
        c.Assert(err, check.IsNil)
        c.Assert(err, check.IsNil)
        _, err = db.ExecContext(s.ctx, `update collections set trash_at = $1, delete_at = $2 where uuid = $3`, time.Now().UTC().Add(time.Second/10), time.Now().UTC().Add(time.Hour), coll.UUID)
        c.Assert(err, check.IsNil)
@@ -550,7 +550,7 @@ func (s *HandlerSuite) TestLogActivity(c *check.C) {
                        c.Assert(err, check.IsNil)
                }
        }
                        c.Assert(err, check.IsNil)
                }
        }
-       db, err := s.handler.db(s.ctx)
+       db, err := s.handler.dbConnector.GetDB(s.ctx)
        c.Assert(err, check.IsNil)
        for _, userUUID := range []string{arvadostest.ActiveUserUUID, arvadostest.SpectatorUserUUID} {
                var rows int
        c.Assert(err, check.IsNil)
        for _, userUUID := range []string{arvadostest.ActiveUserUUID, arvadostest.SpectatorUserUUID} {
                var rows int
index afdf95b782647b2f5f2b372eaaadb7a888fb1936..99e7aec0b66c4dbed9462c498ece37e38971157b 100644 (file)
@@ -20,7 +20,7 @@ func (h *Handler) periodicWorker(workerName string, interval time.Duration, lock
                logger.Debugf("interval is %v, not running worker", interval)
                return
        }
                logger.Debugf("interval is %v, not running worker", interval)
                return
        }
-       if !locker.Lock(ctx, h.db) {
+       if !locker.Lock(ctx, h.dbConnector.GetDB) {
                // context canceled
                return
        }
                // context canceled
                return
        }
@@ -47,7 +47,7 @@ func (h *Handler) trashSweepWorker() {
 
 func (h *Handler) containerLogSweepWorker() {
        h.periodicWorker("container log sweep", h.Cluster.Containers.Logging.SweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error {
 
 func (h *Handler) containerLogSweepWorker() {
        h.periodicWorker("container log sweep", h.Cluster.Containers.Logging.SweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error {
-               db, err := h.db(ctx)
+               db, err := h.dbConnector.GetDB(ctx)
                if err != nil {
                        return err
                }
                if err != nil {
                        return err
                }
index a76420860604b9a6fb9823bdc6b3775c70f85ff4..2a05096ce18b7430e7e1e487dd5d710024ac9193 100644 (file)
@@ -10,6 +10,7 @@ import (
        "sync"
 
        "git.arvados.org/arvados.git/lib/controller/api"
        "sync"
 
        "git.arvados.org/arvados.git/lib/controller/api"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/jmoiron/sqlx"
 
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/jmoiron/sqlx"
 
@@ -142,3 +143,33 @@ func CurrentTx(ctx context.Context) (*sqlx.Tx, error) {
        })
        return txn.tx, txn.err
 }
        })
        return txn.tx, txn.err
 }
+
+var errDBConnection = errors.New("database connection error")
+
+type DBConnector struct {
+       PostgreSQL arvados.PostgreSQL
+       pgdb       *sqlx.DB
+       mtx        sync.Mutex
+}
+
+func (dbc *DBConnector) GetDB(ctx context.Context) (*sqlx.DB, error) {
+       dbc.mtx.Lock()
+       defer dbc.mtx.Unlock()
+       if dbc.pgdb != nil {
+               return dbc.pgdb, nil
+       }
+       db, err := sqlx.Open("postgres", dbc.PostgreSQL.Connection.String())
+       if err != nil {
+               ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed")
+               return nil, errDBConnection
+       }
+       if p := dbc.PostgreSQL.ConnectionPool; p > 0 {
+               db.SetMaxOpenConns(p)
+       }
+       if err := db.Ping(); err != nil {
+               ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect succeeded but ping failed")
+               return nil, errDBConnection
+       }
+       dbc.pgdb = db
+       return db, nil
+}
index ae91a710e395295f47a34cb5645f980021e79021..3403c50c972987e7f6f21a927a6db592fac9f6fc 100644 (file)
@@ -15,6 +15,8 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/controller/dblock"
+       "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/lib/dispatchcloud/container"
        "git.arvados.org/arvados.git/lib/dispatchcloud/scheduler"
        "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
        "git.arvados.org/arvados.git/lib/dispatchcloud/container"
        "git.arvados.org/arvados.git/lib/dispatchcloud/scheduler"
        "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
@@ -53,6 +55,7 @@ type dispatcher struct {
        Registry      *prometheus.Registry
        InstanceSetID cloud.InstanceSetID
 
        Registry      *prometheus.Registry
        InstanceSetID cloud.InstanceSetID
 
+       dbConnector ctrlctx.DBConnector
        logger      logrus.FieldLogger
        instanceSet cloud.InstanceSet
        pool        pool
        logger      logrus.FieldLogger
        instanceSet cloud.InstanceSet
        pool        pool
@@ -118,6 +121,7 @@ func (disp *dispatcher) setup() {
 
 func (disp *dispatcher) initialize() {
        disp.logger = ctxlog.FromContext(disp.Context)
 
 func (disp *dispatcher) initialize() {
        disp.logger = ctxlog.FromContext(disp.Context)
+       disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
 
        disp.ArvClient.AuthToken = disp.AuthToken
 
 
        disp.ArvClient.AuthToken = disp.AuthToken
 
@@ -143,6 +147,7 @@ func (disp *dispatcher) initialize() {
        if err != nil {
                disp.logger.Fatalf("error initializing driver: %s", err)
        }
        if err != nil {
                disp.logger.Fatalf("error initializing driver: %s", err)
        }
+       dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB)
        disp.instanceSet = instanceSet
        disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
        disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
        disp.instanceSet = instanceSet
        disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
        disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
@@ -175,6 +180,7 @@ func (disp *dispatcher) initialize() {
 }
 
 func (disp *dispatcher) run() {
 }
 
 func (disp *dispatcher) run() {
+       defer dblock.Dispatch.Unlock()
        defer close(disp.stopped)
        defer disp.instanceSet.Stop()
        defer disp.pool.Stop()
        defer close(disp.stopped)
        defer disp.instanceSet.Stop()
        defer disp.pool.Stop()
index 829a053636d5dc07abaac1c649810c5416e09fb6..bdbdf1ef9d83f99a0fad85d829f19064eb1924d3 100644 (file)
@@ -15,6 +15,7 @@ import (
        "sync"
        "time"
 
        "sync"
        "time"
 
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/dispatchcloud/test"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
        "git.arvados.org/arvados.git/lib/dispatchcloud/test"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
@@ -49,8 +50,16 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                MinTimeBetweenCreateCalls: time.Millisecond,
        }
 
                MinTimeBetweenCreateCalls: time.Millisecond,
        }
 
+       // We need the postgresql connection info from the integration
+       // test config.
+       cfg, err := config.NewLoader(nil, ctxlog.FromContext(s.ctx)).Load()
+       c.Assert(err, check.IsNil)
+       testcluster, err := cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+
        s.cluster = &arvados.Cluster{
                ManagementToken: "test-management-token",
        s.cluster = &arvados.Cluster{
                ManagementToken: "test-management-token",
+               PostgreSQL:      testcluster.PostgreSQL,
                Containers: arvados.ContainersConfig{
                        CrunchRunCommand:       "crunch-run",
                        CrunchRunArgumentsList: []string{"--foo", "--extra='args'"},
                Containers: arvados.ContainersConfig{
                        CrunchRunCommand:       "crunch-run",
                        CrunchRunArgumentsList: []string{"--foo", "--extra='args'"},
index d362f66d14b3ee12b9a4fb6b197b9a34747d944c..d1408d23cb1a4e3c2274f40d2f02b66bda29e82d 100644 (file)
@@ -18,6 +18,8 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
        "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/controller/dblock"
+       "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/lib/dispatchcloud"
        "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/lib/dispatchcloud"
        "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -58,6 +60,7 @@ type dispatcher struct {
        Registry  *prometheus.Registry
 
        logger        logrus.FieldLogger
        Registry  *prometheus.Registry
 
        logger        logrus.FieldLogger
+       dbConnector   ctrlctx.DBConnector
        lsfcli        lsfcli
        lsfqueue      lsfqueue
        arvDispatcher *dispatch.Dispatcher
        lsfcli        lsfcli
        lsfqueue      lsfqueue
        arvDispatcher *dispatch.Dispatcher
@@ -73,7 +76,9 @@ type dispatcher struct {
 func (disp *dispatcher) Start() {
        disp.initOnce.Do(func() {
                disp.init()
 func (disp *dispatcher) Start() {
        disp.initOnce.Do(func() {
                disp.init()
+               dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
                go func() {
                go func() {
+                       defer dblock.Dispatch.Unlock()
                        disp.checkLsfQueueForOrphans()
                        err := disp.arvDispatcher.Run(disp.Context)
                        if err != nil {
                        disp.checkLsfQueueForOrphans()
                        err := disp.arvDispatcher.Run(disp.Context)
                        if err != nil {
@@ -125,6 +130,7 @@ func (disp *dispatcher) init() {
                lsfcli: &disp.lsfcli,
        }
        disp.ArvClient.AuthToken = disp.AuthToken
                lsfcli: &disp.lsfcli,
        }
        disp.ArvClient.AuthToken = disp.AuthToken
+       disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
        disp.stop = make(chan struct{}, 1)
        disp.stopped = make(chan struct{})
 
        disp.stop = make(chan struct{}, 1)
        disp.stopped = make(chan struct{})
 
index ac394e114962ddf05d2e71e94cc4bb1ff46c4780..1c0f6ad28f5ba7b6d20bcc0cadbef0fb87fec634 100644 (file)
@@ -19,6 +19,8 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
        "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/controller/dblock"
+       "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/lib/dispatchcloud"
        "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/lib/dispatchcloud"
        "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -55,10 +57,11 @@ const initialNiceValue int64 = 10000
 
 type Dispatcher struct {
        *dispatch.Dispatcher
 
 type Dispatcher struct {
        *dispatch.Dispatcher
-       logger  logrus.FieldLogger
-       cluster *arvados.Cluster
-       sqCheck *SqueueChecker
-       slurm   Slurm
+       logger      logrus.FieldLogger
+       cluster     *arvados.Cluster
+       sqCheck     *SqueueChecker
+       slurm       Slurm
+       dbConnector ctrlctx.DBConnector
 
        done chan struct{}
        err  error
 
        done chan struct{}
        err  error
@@ -90,6 +93,7 @@ func (disp *Dispatcher) configure() error {
        disp.Client.APIHost = disp.cluster.Services.Controller.ExternalURL.Host
        disp.Client.AuthToken = disp.cluster.SystemRootToken
        disp.Client.Insecure = disp.cluster.TLS.Insecure
        disp.Client.APIHost = disp.cluster.Services.Controller.ExternalURL.Host
        disp.Client.AuthToken = disp.cluster.SystemRootToken
        disp.Client.Insecure = disp.cluster.TLS.Insecure
+       disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.cluster.PostgreSQL}
 
        if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
                // Copy real configs into env vars so [a]
 
        if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
                // Copy real configs into env vars so [a]
@@ -137,6 +141,8 @@ func (disp *Dispatcher) setup() {
 }
 
 func (disp *Dispatcher) run() error {
 }
 
 func (disp *Dispatcher) run() error {
+       dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
+       defer dblock.Dispatch.Unlock()
        defer disp.sqCheck.Stop()
 
        if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
        defer disp.sqCheck.Stop()
 
        if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {