From 65b12213f740b117fb14822bce0dbb415257c355 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Fri, 28 Oct 2022 10:35:27 -0400 Subject: [PATCH] 18071: Use dblock to prevent multiple dispatchers from competing. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/controller/dblock/dblock.go | 1 + lib/controller/federation.go | 4 +- lib/controller/handler.go | 40 +++---------------- lib/controller/handler_test.go | 4 +- lib/controller/trash.go | 4 +- lib/ctrlctx/db.go | 31 ++++++++++++++ lib/dispatchcloud/dispatcher.go | 6 +++ lib/dispatchcloud/dispatcher_test.go | 9 +++++ lib/lsf/dispatch.go | 6 +++ .../crunch-dispatch-slurm.go | 14 +++++-- 10 files changed, 75 insertions(+), 44 deletions(-) diff --git a/lib/controller/dblock/dblock.go b/lib/controller/dblock/dblock.go index 6f5db10066..ad2733abfa 100644 --- a/lib/controller/dblock/dblock.go +++ b/lib/controller/dblock/dblock.go @@ -21,6 +21,7 @@ var ( ContainerLogSweep = &DBLocker{key: 10002} KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop KeepBalanceActive = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop) + Dispatch = &DBLocker{key: 10005} // any dispatcher running retryDelay = 5 * time.Second ) diff --git a/lib/controller/federation.go b/lib/controller/federation.go index e7d6e29b88..93b8315a63 100644 --- a/lib/controller/federation.go +++ b/lib/controller/federation.go @@ -142,7 +142,7 @@ type CurrentUser struct { // non-nil, true, nil -- if the token is valid func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUser, bool, error) { user := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: token}} - db, err := h.db(req.Context()) + db, err := h.dbConnector.GetDB(req.Context()) if err != nil { ctxlog.FromContext(req.Context()).WithError(err).Debugf("validateAPItoken(%s): database error", token) return nil, false, err @@ -179,7 +179,7 @@ func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUse } func (h *Handler) createAPItoken(req *http.Request, userUUID string, scopes []string) (*arvados.APIClientAuthorization, error) { - db, err := h.db(req.Context()) + db, err := h.dbConnector.GetDB(req.Context()) if err != nil { return nil, err } diff --git a/lib/controller/handler.go b/lib/controller/handler.go index e1392bef92..4c6fca7f77 100644 --- a/lib/controller/handler.go +++ b/lib/controller/handler.go @@ -6,7 +6,6 @@ package controller import ( "context" - "errors" "fmt" "net/http" "net/http/httptest" @@ -21,10 +20,8 @@ import ( "git.arvados.org/arvados.git/lib/controller/router" "git.arvados.org/arvados.git/lib/ctrlctx" "git.arvados.org/arvados.git/sdk/go/arvados" - "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/health" "git.arvados.org/arvados.git/sdk/go/httpserver" - "github.com/jmoiron/sqlx" // sqlx needs lib/pq to talk to PostgreSQL _ "github.com/lib/pq" @@ -40,8 +37,7 @@ type Handler struct { proxy *proxy secureClient *http.Client insecureClient *http.Client - pgdb *sqlx.DB - pgdbMtx sync.Mutex + dbConnector ctrlctx.DBConnector } func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -65,7 +61,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (h *Handler) CheckHealth() error { h.setupOnce.Do(h.setup) - _, err := h.db(context.TODO()) + _, err := h.dbConnector.GetDB(context.TODO()) if err != nil { return err } @@ -97,17 +93,18 @@ func (h *Handler) setup() { mux := http.NewServeMux() healthFuncs := make(map[string]health.Func) - oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.db) + h.dbConnector = ctrlctx.DBConnector{PostgreSQL: h.Cluster.PostgreSQL} + oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.dbConnector.GetDB) h.federation = federation.New(h.Cluster, &healthFuncs) rtr := router.New(h.federation, router.Config{ MaxRequestSize: h.Cluster.API.MaxRequestSize, WrapCalls: api.ComposeWrappers( - ctrlctx.WrapCallsInTransactions(h.db), + ctrlctx.WrapCallsInTransactions(h.dbConnector.GetDB), oidcAuthorizer.WrapCalls, ctrlctx.WrapCallsWithAuth(h.Cluster)), }) - healthRoutes := health.Routes{"ping": func() error { _, err := h.db(context.TODO()); return err }} + healthRoutes := health.Routes{"ping": func() error { _, err := h.dbConnector.GetDB(context.TODO()); return err }} for name, f := range healthFuncs { healthRoutes[name] = f } @@ -158,31 +155,6 @@ func (h *Handler) setup() { go h.containerLogSweepWorker() } -var errDBConnection = errors.New("database connection error") - -func (h *Handler) db(ctx context.Context) (*sqlx.DB, error) { - h.pgdbMtx.Lock() - defer h.pgdbMtx.Unlock() - if h.pgdb != nil { - return h.pgdb, nil - } - - db, err := sqlx.Open("postgres", h.Cluster.PostgreSQL.Connection.String()) - if err != nil { - ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed") - return nil, errDBConnection - } - if p := h.Cluster.PostgreSQL.ConnectionPool; p > 0 { - db.SetMaxOpenConns(p) - } - if err := db.Ping(); err != nil { - ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect succeeded but ping failed") - return nil, errDBConnection - } - h.pgdb = db - return db, nil -} - type middlewareFunc func(http.ResponseWriter, *http.Request, http.Handler) func prepend(next http.Handler, middleware middlewareFunc) http.Handler { diff --git a/lib/controller/handler_test.go b/lib/controller/handler_test.go index 0ffe0255f5..c9999fa28c 100644 --- a/lib/controller/handler_test.go +++ b/lib/controller/handler_test.go @@ -477,7 +477,7 @@ func (s *HandlerSuite) TestTrashSweep(c *check.C) { coll, err := s.handler.federation.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{"name": "test trash sweep"}, EnsureUniqueName: true}) c.Assert(err, check.IsNil) defer s.handler.federation.CollectionDelete(ctx, arvados.DeleteOptions{UUID: coll.UUID}) - db, err := s.handler.db(s.ctx) + db, err := s.handler.dbConnector.GetDB(s.ctx) c.Assert(err, check.IsNil) _, err = db.ExecContext(s.ctx, `update collections set trash_at = $1, delete_at = $2 where uuid = $3`, time.Now().UTC().Add(time.Second/10), time.Now().UTC().Add(time.Hour), coll.UUID) c.Assert(err, check.IsNil) @@ -550,7 +550,7 @@ func (s *HandlerSuite) TestLogActivity(c *check.C) { c.Assert(err, check.IsNil) } } - db, err := s.handler.db(s.ctx) + db, err := s.handler.dbConnector.GetDB(s.ctx) c.Assert(err, check.IsNil) for _, userUUID := range []string{arvadostest.ActiveUserUUID, arvadostest.SpectatorUserUUID} { var rows int diff --git a/lib/controller/trash.go b/lib/controller/trash.go index afdf95b782..99e7aec0b6 100644 --- a/lib/controller/trash.go +++ b/lib/controller/trash.go @@ -20,7 +20,7 @@ func (h *Handler) periodicWorker(workerName string, interval time.Duration, lock logger.Debugf("interval is %v, not running worker", interval) return } - if !locker.Lock(ctx, h.db) { + if !locker.Lock(ctx, h.dbConnector.GetDB) { // context canceled return } @@ -47,7 +47,7 @@ func (h *Handler) trashSweepWorker() { func (h *Handler) containerLogSweepWorker() { h.periodicWorker("container log sweep", h.Cluster.Containers.Logging.SweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error { - db, err := h.db(ctx) + db, err := h.dbConnector.GetDB(ctx) if err != nil { return err } diff --git a/lib/ctrlctx/db.go b/lib/ctrlctx/db.go index a764208606..2a05096ce1 100644 --- a/lib/ctrlctx/db.go +++ b/lib/ctrlctx/db.go @@ -10,6 +10,7 @@ import ( "sync" "git.arvados.org/arvados.git/lib/controller/api" + "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "github.com/jmoiron/sqlx" @@ -142,3 +143,33 @@ func CurrentTx(ctx context.Context) (*sqlx.Tx, error) { }) return txn.tx, txn.err } + +var errDBConnection = errors.New("database connection error") + +type DBConnector struct { + PostgreSQL arvados.PostgreSQL + pgdb *sqlx.DB + mtx sync.Mutex +} + +func (dbc *DBConnector) GetDB(ctx context.Context) (*sqlx.DB, error) { + dbc.mtx.Lock() + defer dbc.mtx.Unlock() + if dbc.pgdb != nil { + return dbc.pgdb, nil + } + db, err := sqlx.Open("postgres", dbc.PostgreSQL.Connection.String()) + if err != nil { + ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed") + return nil, errDBConnection + } + if p := dbc.PostgreSQL.ConnectionPool; p > 0 { + db.SetMaxOpenConns(p) + } + if err := db.Ping(); err != nil { + ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect succeeded but ping failed") + return nil, errDBConnection + } + dbc.pgdb = db + return db, nil +} diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go index ae91a710e3..3403c50c97 100644 --- a/lib/dispatchcloud/dispatcher.go +++ b/lib/dispatchcloud/dispatcher.go @@ -15,6 +15,8 @@ import ( "time" "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/lib/controller/dblock" + "git.arvados.org/arvados.git/lib/ctrlctx" "git.arvados.org/arvados.git/lib/dispatchcloud/container" "git.arvados.org/arvados.git/lib/dispatchcloud/scheduler" "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor" @@ -53,6 +55,7 @@ type dispatcher struct { Registry *prometheus.Registry InstanceSetID cloud.InstanceSetID + dbConnector ctrlctx.DBConnector logger logrus.FieldLogger instanceSet cloud.InstanceSet pool pool @@ -118,6 +121,7 @@ func (disp *dispatcher) setup() { func (disp *dispatcher) initialize() { disp.logger = ctxlog.FromContext(disp.Context) + disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL} disp.ArvClient.AuthToken = disp.AuthToken @@ -143,6 +147,7 @@ func (disp *dispatcher) initialize() { if err != nil { disp.logger.Fatalf("error initializing driver: %s", err) } + dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB) disp.instanceSet = instanceSet disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster) disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient) @@ -175,6 +180,7 @@ func (disp *dispatcher) initialize() { } func (disp *dispatcher) run() { + defer dblock.Dispatch.Unlock() defer close(disp.stopped) defer disp.instanceSet.Stop() defer disp.pool.Stop() diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go index 829a053636..bdbdf1ef9d 100644 --- a/lib/dispatchcloud/dispatcher_test.go +++ b/lib/dispatchcloud/dispatcher_test.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/lib/dispatchcloud/test" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadostest" @@ -49,8 +50,16 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) { MinTimeBetweenCreateCalls: time.Millisecond, } + // We need the postgresql connection info from the integration + // test config. + cfg, err := config.NewLoader(nil, ctxlog.FromContext(s.ctx)).Load() + c.Assert(err, check.IsNil) + testcluster, err := cfg.GetCluster("") + c.Assert(err, check.IsNil) + s.cluster = &arvados.Cluster{ ManagementToken: "test-management-token", + PostgreSQL: testcluster.PostgreSQL, Containers: arvados.ContainersConfig{ CrunchRunCommand: "crunch-run", CrunchRunArgumentsList: []string{"--foo", "--extra='args'"}, diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go index d362f66d14..d1408d23cb 100644 --- a/lib/lsf/dispatch.go +++ b/lib/lsf/dispatch.go @@ -18,6 +18,8 @@ import ( "time" "git.arvados.org/arvados.git/lib/cmd" + "git.arvados.org/arvados.git/lib/controller/dblock" + "git.arvados.org/arvados.git/lib/ctrlctx" "git.arvados.org/arvados.git/lib/dispatchcloud" "git.arvados.org/arvados.git/lib/service" "git.arvados.org/arvados.git/sdk/go/arvados" @@ -58,6 +60,7 @@ type dispatcher struct { Registry *prometheus.Registry logger logrus.FieldLogger + dbConnector ctrlctx.DBConnector lsfcli lsfcli lsfqueue lsfqueue arvDispatcher *dispatch.Dispatcher @@ -73,7 +76,9 @@ type dispatcher struct { func (disp *dispatcher) Start() { disp.initOnce.Do(func() { disp.init() + dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB) go func() { + defer dblock.Dispatch.Unlock() disp.checkLsfQueueForOrphans() err := disp.arvDispatcher.Run(disp.Context) if err != nil { @@ -125,6 +130,7 @@ func (disp *dispatcher) init() { lsfcli: &disp.lsfcli, } disp.ArvClient.AuthToken = disp.AuthToken + disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL} disp.stop = make(chan struct{}, 1) disp.stopped = make(chan struct{}) diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index ac394e1149..1c0f6ad28f 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -19,6 +19,8 @@ import ( "time" "git.arvados.org/arvados.git/lib/cmd" + "git.arvados.org/arvados.git/lib/controller/dblock" + "git.arvados.org/arvados.git/lib/ctrlctx" "git.arvados.org/arvados.git/lib/dispatchcloud" "git.arvados.org/arvados.git/lib/service" "git.arvados.org/arvados.git/sdk/go/arvados" @@ -55,10 +57,11 @@ const initialNiceValue int64 = 10000 type Dispatcher struct { *dispatch.Dispatcher - logger logrus.FieldLogger - cluster *arvados.Cluster - sqCheck *SqueueChecker - slurm Slurm + logger logrus.FieldLogger + cluster *arvados.Cluster + sqCheck *SqueueChecker + slurm Slurm + dbConnector ctrlctx.DBConnector done chan struct{} err error @@ -90,6 +93,7 @@ func (disp *Dispatcher) configure() error { disp.Client.APIHost = disp.cluster.Services.Controller.ExternalURL.Host disp.Client.AuthToken = disp.cluster.SystemRootToken disp.Client.Insecure = disp.cluster.TLS.Insecure + disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.cluster.PostgreSQL} if disp.Client.APIHost != "" || disp.Client.AuthToken != "" { // Copy real configs into env vars so [a] @@ -137,6 +141,8 @@ func (disp *Dispatcher) setup() { } func (disp *Dispatcher) run() error { + dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB) + defer dblock.Dispatch.Unlock() defer disp.sqCheck.Stop() if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 { -- 2.30.2