1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
16 "github.com/jmoiron/sqlx"
20 TrashSweep = &DBLocker{key: 10001}
21 ContainerLogSweep = &DBLocker{key: 10002}
22 KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
23 KeepBalanceActive = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
24 Dispatch = &DBLocker{key: 10005} // any dispatcher running
25 RailsMigrations = &DBLocker{key: 10006}
26 retryDelay = 5 * time.Second
29 // DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
30 // a long-running task like "do X every N seconds".
31 type DBLocker struct {
35 getdb func(context.Context) (*sqlx.DB, error)
36 conn *sql.Conn // != nil if advisory lock has been acquired
39 // Lock acquires the advisory lock, waiting/reconnecting if needed.
41 // Returns false if ctx is canceled before the lock is acquired.
42 func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
43 logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key)
45 for ; ; time.Sleep(retryDelay) {
48 // Another goroutine is already locked/waiting
49 // on this lock. Wait for them to release.
58 if err == context.Canceled {
61 } else if err != nil {
62 logger.WithError(err).Info("error getting database pool")
66 conn, err := db.Conn(ctx)
67 if err == context.Canceled {
70 } else if err != nil {
71 logger.WithError(err).Info("error getting database connection")
76 err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
77 if err == context.Canceled {
79 } else if err != nil {
80 logger.WithError(err).Info("error getting pg_try_advisory_lock")
88 err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN
89 (SELECT pid FROM pg_locks
90 WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port)
92 logger.WithError(err).Info("error getting other client info")
94 heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port))
95 if lastHeldBy != heldBy {
96 logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock")
104 logger.Debug("acquired pg_advisory_lock")
105 dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
111 // Check confirms that the lock is still active (i.e., the session is
112 // still alive), and re-acquires if needed. Panics if Lock is not
115 // Returns false if the context passed to Lock() is canceled before
116 // the lock is confirmed or reacquired.
117 func (dbl *DBLocker) Check() bool {
119 err := dbl.conn.PingContext(dbl.ctx)
120 if err == context.Canceled {
123 } else if err == nil {
124 ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive")
128 ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
131 ctx, getdb := dbl.ctx, dbl.getdb
133 return dbl.Lock(ctx, getdb)
136 func (dbl *DBLocker) Unlock() {
138 defer dbl.mtx.Unlock()
140 _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
142 ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock")
144 ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock")