From e33a15001d7a94a805a5d0d4c77544d959974193 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 27 Oct 2022 11:05:42 -0400 Subject: [PATCH] 18071: Log host:port of other process(es) holding lock. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/controller/dblock/dblock.go | 31 +++++++--- lib/controller/dblock/dblock_test.go | 91 ++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 7 deletions(-) create mode 100644 lib/controller/dblock/dblock_test.go diff --git a/lib/controller/dblock/dblock.go b/lib/controller/dblock/dblock.go index 472633747c..6f5db10066 100644 --- a/lib/controller/dblock/dblock.go +++ b/lib/controller/dblock/dblock.go @@ -7,6 +7,8 @@ package dblock import ( "context" "database/sql" + "fmt" + "net" "sync" "time" @@ -36,7 +38,8 @@ type DBLocker struct { // // Returns false if ctx is canceled before the lock is acquired. func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool { - logger := ctxlog.FromContext(ctx) + logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key) + var lastHeldBy string for ; ; time.Sleep(retryDelay) { dbl.mtx.Lock() if dbl.conn != nil { @@ -54,7 +57,7 @@ func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sql dbl.mtx.Unlock() return false } else if err != nil { - logger.WithError(err).Infof("error getting database pool") + logger.WithError(err).Info("error getting database pool") dbl.mtx.Unlock() continue } @@ -72,17 +75,31 @@ func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sql if err == context.Canceled { return false } else if err != nil { - logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key) + logger.WithError(err).Info("error getting pg_try_advisory_lock") conn.Close() dbl.mtx.Unlock() continue } if !locked { + var host string + var port int + err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN + (SELECT pid FROM pg_locks + WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port) + if err != nil { + logger.WithError(err).Info("error getting other client info") + } else { + heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port)) + if lastHeldBy != heldBy { + logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock") + lastHeldBy = heldBy + } + } conn.Close() dbl.mtx.Unlock() continue } - logger.Debugf("acquired pg_advisory_lock %d", dbl.key) + logger.Debug("acquired pg_advisory_lock") dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn dbl.mtx.Unlock() return true @@ -102,7 +119,7 @@ func (dbl *DBLocker) Check() bool { dbl.mtx.Unlock() return false } else if err == nil { - ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key) + ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive") dbl.mtx.Unlock() return true } @@ -120,9 +137,9 @@ func (dbl *DBLocker) Unlock() { if dbl.conn != nil { _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key) if err != nil { - ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key) + ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock") } else { - ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key) + ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock") } dbl.conn.Close() dbl.conn = nil diff --git a/lib/controller/dblock/dblock_test.go b/lib/controller/dblock/dblock_test.go new file mode 100644 index 0000000000..6079df6f87 --- /dev/null +++ b/lib/controller/dblock/dblock_test.go @@ -0,0 +1,91 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package dblock + +import ( + "bytes" + "context" + "sync" + "testing" + "time" + + "git.arvados.org/arvados.git/lib/config" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadostest" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "github.com/jmoiron/sqlx" + "github.com/sirupsen/logrus" + check "gopkg.in/check.v1" +) + +func Test(t *testing.T) { + check.TestingT(t) +} + +var _ = check.Suite(&suite{}) + +type suite struct { + cluster *arvados.Cluster + db *sqlx.DB + getdb func(context.Context) (*sqlx.DB, error) +} + +var testLocker = &DBLocker{key: 999} + +func (s *suite) SetUpSuite(c *check.C) { + cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load() + c.Assert(err, check.IsNil) + s.cluster, err = cfg.GetCluster("") + c.Assert(err, check.IsNil) + s.db = arvadostest.DB(c, s.cluster) + s.getdb = func(context.Context) (*sqlx.DB, error) { return s.db, nil } +} + +func (s *suite) TestLock(c *check.C) { + retryDelay = time.Millisecond + + var logbuf bytes.Buffer + logger := ctxlog.New(&logbuf, "text", "debug") + logger.Level = logrus.DebugLevel + ctx := ctxlog.Context(context.Background(), logger) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + testLocker.Lock(ctx, s.getdb) + testLocker.Check() + + lock2 := make(chan bool) + var wg sync.WaitGroup + defer wg.Wait() + wg.Add(1) + go func() { + defer wg.Done() + testLocker2 := &DBLocker{key: 999} + testLocker2.Lock(ctx, s.getdb) + close(lock2) + testLocker2.Check() + testLocker2.Unlock() + }() + + // Second lock should wait for first to Unlock + select { + case <-time.After(time.Second / 10): + c.Check(logbuf.String(), check.Matches, `(?ms).*level=info.*DBClient="[^"]+:\d+".*ID=999.*`) + case <-lock2: + c.Log("double-lock") + c.Fail() + } + + testLocker.Check() + testLocker.Unlock() + + // Now the second lock should succeed within retryDelay + select { + case <-time.After(retryDelay * 2): + c.Log("timed out") + c.Fail() + case <-lock2: + } + c.Logf("%s", logbuf.String()) +} -- 2.30.2