Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / lib / controller / dblock / dblock.go
index 1a36822d5b7f91e81c5b0deb167a105a962b3dfb..ad2733abfa36df82c72c4aa3c7a6c090c6496efb 100644 (file)
@@ -7,6 +7,8 @@ package dblock
 import (
        "context"
        "database/sql"
+       "fmt"
+       "net"
        "sync"
        "time"
 
@@ -15,8 +17,12 @@ import (
 )
 
 var (
-       TrashSweep = &DBLocker{key: 10001}
-       retryDelay = 5 * time.Second
+       TrashSweep         = &DBLocker{key: 10001}
+       ContainerLogSweep  = &DBLocker{key: 10002}
+       KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
+       KeepBalanceActive  = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
+       Dispatch           = &DBLocker{key: 10005} // any dispatcher running
+       retryDelay         = 5 * time.Second
 )
 
 // DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
@@ -30,8 +36,11 @@ type DBLocker struct {
 }
 
 // Lock acquires the advisory lock, waiting/reconnecting if needed.
-func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) {
-       logger := ctxlog.FromContext(ctx)
+//
+// Returns false if ctx is canceled before the lock is acquired.
+func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
+       logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key)
+       var lastHeldBy string
        for ; ; time.Sleep(retryDelay) {
                dbl.mtx.Lock()
                if dbl.conn != nil {
@@ -40,55 +49,87 @@ func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sql
                        dbl.mtx.Unlock()
                        continue
                }
+               if ctx.Err() != nil {
+                       dbl.mtx.Unlock()
+                       return false
+               }
                db, err := getdb(ctx)
-               if err != nil {
-                       logger.WithError(err).Infof("error getting database pool")
+               if err == context.Canceled {
+                       dbl.mtx.Unlock()
+                       return false
+               } else if err != nil {
+                       logger.WithError(err).Info("error getting database pool")
                        dbl.mtx.Unlock()
                        continue
                }
                conn, err := db.Conn(ctx)
-               if err != nil {
+               if err == context.Canceled {
+                       dbl.mtx.Unlock()
+                       return false
+               } else if err != nil {
                        logger.WithError(err).Info("error getting database connection")
                        dbl.mtx.Unlock()
                        continue
                }
                var locked bool
                err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
-               if err != nil {
-                       logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
+               if err == context.Canceled {
+                       return false
+               } else if err != nil {
+                       logger.WithError(err).Info("error getting pg_try_advisory_lock")
                        conn.Close()
                        dbl.mtx.Unlock()
                        continue
                }
                if !locked {
+                       var host string
+                       var port int
+                       err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN
+                               (SELECT pid FROM pg_locks
+                                WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port)
+                       if err != nil {
+                               logger.WithError(err).Info("error getting other client info")
+                       } else {
+                               heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port))
+                               if lastHeldBy != heldBy {
+                                       logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock")
+                                       lastHeldBy = heldBy
+                               }
+                       }
                        conn.Close()
                        dbl.mtx.Unlock()
                        continue
                }
-               logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
+               logger.Debug("acquired pg_advisory_lock")
                dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
                dbl.mtx.Unlock()
-               return
+               return true
        }
 }
 
 // Check confirms that the lock is still active (i.e., the session is
 // still alive), and re-acquires if needed. Panics if Lock is not
 // acquired first.
-func (dbl *DBLocker) Check() {
+//
+// Returns false if the context passed to Lock() is canceled before
+// the lock is confirmed or reacquired.
+func (dbl *DBLocker) Check() bool {
        dbl.mtx.Lock()
        err := dbl.conn.PingContext(dbl.ctx)
-       if err == nil {
-               ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
+       if err == context.Canceled {
+               dbl.mtx.Unlock()
+               return false
+       } else if err == nil {
+               ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive")
                dbl.mtx.Unlock()
-               return
+               return true
        }
        ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
        dbl.conn.Close()
        dbl.conn = nil
        ctx, getdb := dbl.ctx, dbl.getdb
        dbl.mtx.Unlock()
-       dbl.Lock(ctx, getdb)
+       return dbl.Lock(ctx, getdb)
 }
 
 func (dbl *DBLocker) Unlock() {
@@ -97,9 +138,9 @@ func (dbl *DBLocker) Unlock() {
        if dbl.conn != nil {
                _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
                if err != nil {
-                       ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
+                       ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock")
                } else {
-                       ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
+                       ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock")
                }
                dbl.conn.Close()
                dbl.conn = nil