18071: Log host:port of other process(es) holding lock.
authorTom Clegg <tom@curii.com>
Thu, 27 Oct 2022 15:05:42 +0000 (11:05 -0400)
committerTom Clegg <tom@curii.com>
Thu, 27 Oct 2022 15:05:42 +0000 (11:05 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/controller/dblock/dblock.go
lib/controller/dblock/dblock_test.go [new file with mode: 0644]

index 472633747c5d9ba6d832777de704fc3cd25fa387..6f5db10066db100063f2b60682144eef1e35a2b5 100644 (file)
@@ -7,6 +7,8 @@ package dblock
 import (
        "context"
        "database/sql"
+       "fmt"
+       "net"
        "sync"
        "time"
 
@@ -36,7 +38,8 @@ type DBLocker struct {
 //
 // Returns false if ctx is canceled before the lock is acquired.
 func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
-       logger := ctxlog.FromContext(ctx)
+       logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key)
+       var lastHeldBy string
        for ; ; time.Sleep(retryDelay) {
                dbl.mtx.Lock()
                if dbl.conn != nil {
@@ -54,7 +57,7 @@ func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sql
                        dbl.mtx.Unlock()
                        return false
                } else if err != nil {
-                       logger.WithError(err).Infof("error getting database pool")
+                       logger.WithError(err).Info("error getting database pool")
                        dbl.mtx.Unlock()
                        continue
                }
@@ -72,17 +75,31 @@ func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sql
                if err == context.Canceled {
                        return false
                } else if err != nil {
-                       logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
+                       logger.WithError(err).Info("error getting pg_try_advisory_lock")
                        conn.Close()
                        dbl.mtx.Unlock()
                        continue
                }
                if !locked {
+                       var host string
+                       var port int
+                       err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN
+                               (SELECT pid FROM pg_locks
+                                WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port)
+                       if err != nil {
+                               logger.WithError(err).Info("error getting other client info")
+                       } else {
+                               heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port))
+                               if lastHeldBy != heldBy {
+                                       logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock")
+                                       lastHeldBy = heldBy
+                               }
+                       }
                        conn.Close()
                        dbl.mtx.Unlock()
                        continue
                }
-               logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
+               logger.Debug("acquired pg_advisory_lock")
                dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
                dbl.mtx.Unlock()
                return true
@@ -102,7 +119,7 @@ func (dbl *DBLocker) Check() bool {
                dbl.mtx.Unlock()
                return false
        } else if err == nil {
-               ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
+               ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive")
                dbl.mtx.Unlock()
                return true
        }
@@ -120,9 +137,9 @@ func (dbl *DBLocker) Unlock() {
        if dbl.conn != nil {
                _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
                if err != nil {
-                       ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
+                       ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock")
                } else {
-                       ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
+                       ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock")
                }
                dbl.conn.Close()
                dbl.conn = nil
diff --git a/lib/controller/dblock/dblock_test.go b/lib/controller/dblock/dblock_test.go
new file mode 100644 (file)
index 0000000..6079df6
--- /dev/null
@@ -0,0 +1,91 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dblock
+
+import (
+       "bytes"
+       "context"
+       "sync"
+       "testing"
+       "time"
+
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/jmoiron/sqlx"
+       "github.com/sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&suite{})
+
+type suite struct {
+       cluster *arvados.Cluster
+       db      *sqlx.DB
+       getdb   func(context.Context) (*sqlx.DB, error)
+}
+
+var testLocker = &DBLocker{key: 999}
+
+func (s *suite) SetUpSuite(c *check.C) {
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, check.IsNil)
+       s.cluster, err = cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+       s.db = arvadostest.DB(c, s.cluster)
+       s.getdb = func(context.Context) (*sqlx.DB, error) { return s.db, nil }
+}
+
+func (s *suite) TestLock(c *check.C) {
+       retryDelay = time.Millisecond
+
+       var logbuf bytes.Buffer
+       logger := ctxlog.New(&logbuf, "text", "debug")
+       logger.Level = logrus.DebugLevel
+       ctx := ctxlog.Context(context.Background(), logger)
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+       testLocker.Lock(ctx, s.getdb)
+       testLocker.Check()
+
+       lock2 := make(chan bool)
+       var wg sync.WaitGroup
+       defer wg.Wait()
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               testLocker2 := &DBLocker{key: 999}
+               testLocker2.Lock(ctx, s.getdb)
+               close(lock2)
+               testLocker2.Check()
+               testLocker2.Unlock()
+       }()
+
+       // Second lock should wait for first to Unlock
+       select {
+       case <-time.After(time.Second / 10):
+               c.Check(logbuf.String(), check.Matches, `(?ms).*level=info.*DBClient="[^"]+:\d+".*ID=999.*`)
+       case <-lock2:
+               c.Log("double-lock")
+               c.Fail()
+       }
+
+       testLocker.Check()
+       testLocker.Unlock()
+
+       // Now the second lock should succeed within retryDelay
+       select {
+       case <-time.After(retryDelay * 2):
+               c.Log("timed out")
+               c.Fail()
+       case <-lock2:
+       }
+       c.Logf("%s", logbuf.String())
+}