18339: Extract dblocker to a package.
authorTom Clegg <tom@curii.com>
Tue, 16 Nov 2021 16:06:23 +0000 (11:06 -0500)
committerTom Clegg <tom@curii.com>
Tue, 16 Nov 2021 16:06:23 +0000 (11:06 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/controller/dblock/dblock.go [new file with mode: 0644]
lib/controller/trash.go [new file with mode: 0644]
lib/controller/worker.go [deleted file]

diff --git a/lib/controller/dblock/dblock.go b/lib/controller/dblock/dblock.go
new file mode 100644 (file)
index 0000000..b0d3488
--- /dev/null
@@ -0,0 +1,101 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dblock
+
+import (
+       "context"
+       "database/sql"
+       "sync"
+       "time"
+
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/jmoiron/sqlx"
+)
+
+var (
+       TrashSweep = &DBLocker{key: 10001}
+       retryDelay = 5 * time.Second
+)
+
+// DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
+// a long-running task like "do X every N seconds".
+type DBLocker struct {
+       key   int
+       mtx   sync.Mutex
+       ctx   context.Context
+       getdb func(context.Context) (*sqlx.DB, error)
+       conn  *sql.Conn // != nil if advisory lock has been acquired
+}
+
+// Lock acquires the advisory lock, waiting/reconnecting if needed.
+func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) {
+       logger := ctxlog.FromContext(ctx)
+       for ; ; time.Sleep(retryDelay) {
+               dbl.mtx.Lock()
+               if dbl.conn != nil {
+                       // Already locked by another caller in this
+                       // process. Wait for them to release.
+                       dbl.mtx.Unlock()
+                       continue
+               }
+               db, err := getdb(ctx)
+               if err != nil {
+                       logger.WithError(err).Infof("error getting database pool")
+                       dbl.mtx.Unlock()
+                       continue
+               }
+               conn, err := db.Conn(ctx)
+               if err != nil {
+                       logger.WithError(err).Info("error getting database connection")
+                       dbl.mtx.Unlock()
+                       continue
+               }
+               _, err = conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, dbl.key)
+               if err != nil {
+                       logger.WithError(err).Infof("error getting pg_advisory_lock %d", dbl.key)
+                       conn.Close()
+                       dbl.mtx.Unlock()
+                       continue
+               }
+               logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
+               dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
+               dbl.mtx.Unlock()
+               return
+       }
+}
+
+// Check confirms that the lock is still active (i.e., the session is
+// still alive), and re-acquires if needed. Panics if Lock is not
+// acquired first.
+func (dbl *DBLocker) Check() {
+       dbl.mtx.Lock()
+       err := dbl.conn.PingContext(dbl.ctx)
+       if err == nil {
+               ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
+               dbl.mtx.Unlock()
+               return
+       }
+       ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
+       dbl.conn.Close()
+       dbl.conn = nil
+       ctx, getdb := dbl.ctx, dbl.getdb
+       dbl.mtx.Unlock()
+       dbl.Lock(ctx, getdb)
+}
+
+func (dbl *DBLocker) Unlock() {
+       dbl.mtx.Lock()
+       defer dbl.mtx.Unlock()
+       if dbl.conn != nil {
+               _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
+               if err != nil {
+                       ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
+               } else {
+                       ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
+               }
+               dbl.conn.Close()
+               dbl.conn = nil
+       }
+}
diff --git a/lib/controller/trash.go b/lib/controller/trash.go
new file mode 100644 (file)
index 0000000..551b2f9
--- /dev/null
@@ -0,0 +1,33 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+       "time"
+
+       "git.arvados.org/arvados.git/lib/controller/dblock"
+       "git.arvados.org/arvados.git/sdk/go/auth"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+)
+
+func (h *Handler) trashSweepWorker() {
+       sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
+       logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+       ctx := ctxlog.Context(h.BackgroundContext, logger)
+       if sleep <= 0 {
+               logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+               return
+       }
+       dblock.TrashSweep.Lock(ctx, h.db)
+       defer dblock.TrashSweep.Unlock()
+       for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
+               dblock.TrashSweep.Check()
+               ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+               _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+               if err != nil {
+                       logger.WithError(err).Info("trash sweep failed")
+               }
+       }
+}
diff --git a/lib/controller/worker.go b/lib/controller/worker.go
deleted file mode 100644 (file)
index 02f3db3..0000000
+++ /dev/null
@@ -1,95 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package controller
-
-import (
-       "context"
-       "database/sql"
-       "time"
-
-       "git.arvados.org/arvados.git/sdk/go/auth"
-       "git.arvados.org/arvados.git/sdk/go/ctxlog"
-       "github.com/jmoiron/sqlx"
-)
-
-const (
-       // lock keys should be added here with explicit values, to
-       // ensure they do not get accidentally renumbered when a key
-       // is added or removed.
-       lockKeyTrashSweep = 10001
-)
-
-// dbLocker uses pg_advisory_lock to maintain a cluster-wide lock for
-// a long-running task like "do X every N seconds".
-type dbLocker struct {
-       GetDB   func(context.Context) (*sqlx.DB, error)
-       LockKey int
-
-       conn *sql.Conn // != nil if advisory lock is acquired
-}
-
-// Lock acquires the advisory lock the first time it is
-// called. Subsequent calls confirm that the lock is still active
-// (i.e., the session is still alive), and re-acquires if needed.
-func (dbl *dbLocker) Lock(ctx context.Context) {
-       logger := ctxlog.FromContext(ctx)
-       for ; ; time.Sleep(5 * time.Second) {
-               if dbl.conn == nil {
-                       db, err := dbl.GetDB(ctx)
-                       if err != nil {
-                               logger.WithError(err).Infof("error getting database pool")
-                               continue
-                       }
-                       conn, err := db.Conn(ctx)
-                       if err != nil {
-                               logger.WithError(err).Info("error getting database connection")
-                               continue
-                       }
-                       _, err = conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, dbl.LockKey)
-                       if err != nil {
-                               logger.WithError(err).Info("error getting lock")
-                               conn.Close()
-                               continue
-                       }
-                       dbl.conn = conn
-               }
-               err := dbl.conn.PingContext(ctx)
-               if err != nil {
-                       logger.WithError(err).Info("database connection ping failed")
-                       dbl.conn.Close()
-                       dbl.conn = nil
-                       continue
-               }
-               return
-       }
-}
-
-func (dbl *dbLocker) Unlock() {
-       if dbl.conn != nil {
-               dbl.conn.Close()
-               dbl.conn = nil
-       }
-}
-
-func (h *Handler) trashSweepWorker() {
-       sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
-       logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
-       ctx := ctxlog.Context(h.BackgroundContext, logger)
-       if sleep <= 0 {
-               logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
-               return
-       }
-       locker := &dbLocker{GetDB: h.db, LockKey: lockKeyTrashSweep}
-       locker.Lock(ctx)
-       defer locker.Unlock()
-       for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
-               locker.Lock(ctx)
-               ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
-               _, err := h.federation.SysTrashSweep(ctx, struct{}{})
-               if err != nil {
-                       logger.WithError(err).Info("trash sweep failed")
-               }
-       }
-}