X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9828e9218084856240fdeafa2d388d8bf322e655..09cbdc3074b3f1e69c9c537875146f6da0a6ed8f:/lib/controller/dblock/dblock.go diff --git a/lib/controller/dblock/dblock.go b/lib/controller/dblock/dblock.go index 1a36822d5b..ad2733abfa 100644 --- a/lib/controller/dblock/dblock.go +++ b/lib/controller/dblock/dblock.go @@ -7,6 +7,8 @@ package dblock import ( "context" "database/sql" + "fmt" + "net" "sync" "time" @@ -15,8 +17,12 @@ import ( ) var ( - TrashSweep = &DBLocker{key: 10001} - retryDelay = 5 * time.Second + TrashSweep = &DBLocker{key: 10001} + ContainerLogSweep = &DBLocker{key: 10002} + KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop + KeepBalanceActive = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop) + Dispatch = &DBLocker{key: 10005} // any dispatcher running + retryDelay = 5 * time.Second ) // DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for @@ -30,8 +36,11 @@ type DBLocker struct { } // Lock acquires the advisory lock, waiting/reconnecting if needed. -func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) { - logger := ctxlog.FromContext(ctx) +// +// Returns false if ctx is canceled before the lock is acquired. +func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool { + logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key) + var lastHeldBy string for ; ; time.Sleep(retryDelay) { dbl.mtx.Lock() if dbl.conn != nil { @@ -40,55 +49,87 @@ func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sql dbl.mtx.Unlock() continue } + if ctx.Err() != nil { + dbl.mtx.Unlock() + return false + } db, err := getdb(ctx) - if err != nil { - logger.WithError(err).Infof("error getting database pool") + if err == context.Canceled { + dbl.mtx.Unlock() + return false + } else if err != nil { + logger.WithError(err).Info("error getting database pool") dbl.mtx.Unlock() continue } conn, err := db.Conn(ctx) - if err != nil { + if err == context.Canceled { + dbl.mtx.Unlock() + return false + } else if err != nil { logger.WithError(err).Info("error getting database connection") dbl.mtx.Unlock() continue } var locked bool err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked) - if err != nil { - logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key) + if err == context.Canceled { + return false + } else if err != nil { + logger.WithError(err).Info("error getting pg_try_advisory_lock") conn.Close() dbl.mtx.Unlock() continue } if !locked { + var host string + var port int + err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN + (SELECT pid FROM pg_locks + WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port) + if err != nil { + logger.WithError(err).Info("error getting other client info") + } else { + heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port)) + if lastHeldBy != heldBy { + logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock") + lastHeldBy = heldBy + } + } conn.Close() dbl.mtx.Unlock() continue } - logger.Debugf("acquired pg_advisory_lock %d", dbl.key) + logger.Debug("acquired pg_advisory_lock") dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn dbl.mtx.Unlock() - return + return true } } // Check confirms that the lock is still active (i.e., the session is // still alive), and re-acquires if needed. Panics if Lock is not // acquired first. -func (dbl *DBLocker) Check() { +// +// Returns false if the context passed to Lock() is canceled before +// the lock is confirmed or reacquired. +func (dbl *DBLocker) Check() bool { dbl.mtx.Lock() err := dbl.conn.PingContext(dbl.ctx) - if err == nil { - ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key) + if err == context.Canceled { + dbl.mtx.Unlock() + return false + } else if err == nil { + ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive") dbl.mtx.Unlock() - return + return true } ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed") dbl.conn.Close() dbl.conn = nil ctx, getdb := dbl.ctx, dbl.getdb dbl.mtx.Unlock() - dbl.Lock(ctx, getdb) + return dbl.Lock(ctx, getdb) } func (dbl *DBLocker) Unlock() { @@ -97,9 +138,9 @@ func (dbl *DBLocker) Unlock() { if dbl.conn != nil { _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key) if err != nil { - ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key) + ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock") } else { - ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key) + ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock") } dbl.conn.Close() dbl.conn = nil