1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.arvados.org/arvados.git/sdk/go/ctxlog"
14 "github.com/jmoiron/sqlx"
18 TrashSweep = &DBLocker{key: 10001}
19 ContainerLogSweep = &DBLocker{key: 10002}
20 KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
21 KeepBalanceActive = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
22 retryDelay = 5 * time.Second
25 // DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
26 // a long-running task like "do X every N seconds".
27 type DBLocker struct {
31 getdb func(context.Context) (*sqlx.DB, error)
32 conn *sql.Conn // != nil if advisory lock has been acquired
35 // Lock acquires the advisory lock, waiting/reconnecting if needed.
37 // Returns false if ctx is canceled before the lock is acquired.
38 func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
39 logger := ctxlog.FromContext(ctx)
40 for ; ; time.Sleep(retryDelay) {
43 // Another goroutine is already locked/waiting
44 // on this lock. Wait for them to release.
53 if err == context.Canceled {
56 } else if err != nil {
57 logger.WithError(err).Infof("error getting database pool")
61 conn, err := db.Conn(ctx)
62 if err == context.Canceled {
65 } else if err != nil {
66 logger.WithError(err).Info("error getting database connection")
71 err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
72 if err == context.Canceled {
74 } else if err != nil {
75 logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
85 logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
86 dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
92 // Check confirms that the lock is still active (i.e., the session is
93 // still alive), and re-acquires if needed. Panics if Lock is not
96 // Returns false if the context passed to Lock() is canceled before
97 // the lock is confirmed or reacquired.
98 func (dbl *DBLocker) Check() bool {
100 err := dbl.conn.PingContext(dbl.ctx)
101 if err == context.Canceled {
104 } else if err == nil {
105 ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
109 ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
112 ctx, getdb := dbl.ctx, dbl.getdb
114 return dbl.Lock(ctx, getdb)
117 func (dbl *DBLocker) Unlock() {
119 defer dbl.mtx.Unlock()
121 _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
123 ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
125 ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)