Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / lib / controller / dblock / dblock.go
index 472633747c5d9ba6d832777de704fc3cd25fa387..ad2733abfa36df82c72c4aa3c7a6c090c6496efb 100644 (file)
@@ -7,6 +7,8 @@ package dblock
 import (
        "context"
        "database/sql"
+       "fmt"
+       "net"
        "sync"
        "time"
 
@@ -19,6 +21,7 @@ var (
        ContainerLogSweep  = &DBLocker{key: 10002}
        KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
        KeepBalanceActive  = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
+       Dispatch           = &DBLocker{key: 10005} // any dispatcher running
        retryDelay         = 5 * time.Second
 )
 
@@ -36,7 +39,8 @@ type DBLocker struct {
 //
 // Returns false if ctx is canceled before the lock is acquired.
 func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
-       logger := ctxlog.FromContext(ctx)
+       logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key)
+       var lastHeldBy string
        for ; ; time.Sleep(retryDelay) {
                dbl.mtx.Lock()
                if dbl.conn != nil {
@@ -54,7 +58,7 @@ func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sql
                        dbl.mtx.Unlock()
                        return false
                } else if err != nil {
-                       logger.WithError(err).Infof("error getting database pool")
+                       logger.WithError(err).Info("error getting database pool")
                        dbl.mtx.Unlock()
                        continue
                }
@@ -72,17 +76,31 @@ func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sql
                if err == context.Canceled {
                        return false
                } else if err != nil {
-                       logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
+                       logger.WithError(err).Info("error getting pg_try_advisory_lock")
                        conn.Close()
                        dbl.mtx.Unlock()
                        continue
                }
                if !locked {
+                       var host string
+                       var port int
+                       err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN
+                               (SELECT pid FROM pg_locks
+                                WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port)
+                       if err != nil {
+                               logger.WithError(err).Info("error getting other client info")
+                       } else {
+                               heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port))
+                               if lastHeldBy != heldBy {
+                                       logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock")
+                                       lastHeldBy = heldBy
+                               }
+                       }
                        conn.Close()
                        dbl.mtx.Unlock()
                        continue
                }
-               logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
+               logger.Debug("acquired pg_advisory_lock")
                dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
                dbl.mtx.Unlock()
                return true
@@ -102,7 +120,7 @@ func (dbl *DBLocker) Check() bool {
                dbl.mtx.Unlock()
                return false
        } else if err == nil {
-               ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
+               ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive")
                dbl.mtx.Unlock()
                return true
        }
@@ -120,9 +138,9 @@ func (dbl *DBLocker) Unlock() {
        if dbl.conn != nil {
                _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
                if err != nil {
-                       ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
+                       ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock")
                } else {
-                       ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
+                       ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock")
                }
                dbl.conn.Close()
                dbl.conn = nil