Merge branch '19144-wb-copy-collections-fix'. Closes #19144
[arvados.git] / lib / controller / dblock / dblock.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dblock
6
7 import (
8         "context"
9         "database/sql"
10         "sync"
11         "time"
12
13         "git.arvados.org/arvados.git/sdk/go/ctxlog"
14         "github.com/jmoiron/sqlx"
15 )
16
17 var (
18         TrashSweep = &DBLocker{key: 10001}
19         retryDelay = 5 * time.Second
20 )
21
22 // DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
23 // a long-running task like "do X every N seconds".
24 type DBLocker struct {
25         key   int
26         mtx   sync.Mutex
27         ctx   context.Context
28         getdb func(context.Context) (*sqlx.DB, error)
29         conn  *sql.Conn // != nil if advisory lock has been acquired
30 }
31
32 // Lock acquires the advisory lock, waiting/reconnecting if needed.
33 func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) {
34         logger := ctxlog.FromContext(ctx)
35         for ; ; time.Sleep(retryDelay) {
36                 dbl.mtx.Lock()
37                 if dbl.conn != nil {
38                         // Another goroutine is already locked/waiting
39                         // on this lock. Wait for them to release.
40                         dbl.mtx.Unlock()
41                         continue
42                 }
43                 db, err := getdb(ctx)
44                 if err != nil {
45                         logger.WithError(err).Infof("error getting database pool")
46                         dbl.mtx.Unlock()
47                         continue
48                 }
49                 conn, err := db.Conn(ctx)
50                 if err != nil {
51                         logger.WithError(err).Info("error getting database connection")
52                         dbl.mtx.Unlock()
53                         continue
54                 }
55                 var locked bool
56                 err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
57                 if err != nil {
58                         logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
59                         conn.Close()
60                         dbl.mtx.Unlock()
61                         continue
62                 }
63                 if !locked {
64                         conn.Close()
65                         dbl.mtx.Unlock()
66                         continue
67                 }
68                 logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
69                 dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
70                 dbl.mtx.Unlock()
71                 return
72         }
73 }
74
75 // Check confirms that the lock is still active (i.e., the session is
76 // still alive), and re-acquires if needed. Panics if Lock is not
77 // acquired first.
78 func (dbl *DBLocker) Check() {
79         dbl.mtx.Lock()
80         err := dbl.conn.PingContext(dbl.ctx)
81         if err == nil {
82                 ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
83                 dbl.mtx.Unlock()
84                 return
85         }
86         ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
87         dbl.conn.Close()
88         dbl.conn = nil
89         ctx, getdb := dbl.ctx, dbl.getdb
90         dbl.mtx.Unlock()
91         dbl.Lock(ctx, getdb)
92 }
93
94 func (dbl *DBLocker) Unlock() {
95         dbl.mtx.Lock()
96         defer dbl.mtx.Unlock()
97         if dbl.conn != nil {
98                 _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
99                 if err != nil {
100                         ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
101                 } else {
102                         ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
103                 }
104                 dbl.conn.Close()
105                 dbl.conn = nil
106         }
107 }