18071: Use dblock to avoid concurrent keep-balance ops.
[arvados.git] / lib / controller / dblock / dblock.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dblock
6
7 import (
8         "context"
9         "database/sql"
10         "sync"
11         "time"
12
13         "git.arvados.org/arvados.git/sdk/go/ctxlog"
14         "github.com/jmoiron/sqlx"
15 )
16
17 var (
18         TrashSweep         = &DBLocker{key: 10001}
19         ContainerLogSweep  = &DBLocker{key: 10002}
20         KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
21         KeepBalanceActive  = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
22         retryDelay         = 5 * time.Second
23 )
24
25 // DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
26 // a long-running task like "do X every N seconds".
27 type DBLocker struct {
28         key   int
29         mtx   sync.Mutex
30         ctx   context.Context
31         getdb func(context.Context) (*sqlx.DB, error)
32         conn  *sql.Conn // != nil if advisory lock has been acquired
33 }
34
35 // Lock acquires the advisory lock, waiting/reconnecting if needed.
36 //
37 // Returns false if ctx is canceled before the lock is acquired.
38 func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
39         logger := ctxlog.FromContext(ctx)
40         for ; ; time.Sleep(retryDelay) {
41                 dbl.mtx.Lock()
42                 if dbl.conn != nil {
43                         // Another goroutine is already locked/waiting
44                         // on this lock. Wait for them to release.
45                         dbl.mtx.Unlock()
46                         continue
47                 }
48                 if ctx.Err() != nil {
49                         dbl.mtx.Unlock()
50                         return false
51                 }
52                 db, err := getdb(ctx)
53                 if err == context.Canceled {
54                         dbl.mtx.Unlock()
55                         return false
56                 } else if err != nil {
57                         logger.WithError(err).Infof("error getting database pool")
58                         dbl.mtx.Unlock()
59                         continue
60                 }
61                 conn, err := db.Conn(ctx)
62                 if err == context.Canceled {
63                         dbl.mtx.Unlock()
64                         return false
65                 } else if err != nil {
66                         logger.WithError(err).Info("error getting database connection")
67                         dbl.mtx.Unlock()
68                         continue
69                 }
70                 var locked bool
71                 err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
72                 if err == context.Canceled {
73                         return false
74                 } else if err != nil {
75                         logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
76                         conn.Close()
77                         dbl.mtx.Unlock()
78                         continue
79                 }
80                 if !locked {
81                         conn.Close()
82                         dbl.mtx.Unlock()
83                         continue
84                 }
85                 logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
86                 dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
87                 dbl.mtx.Unlock()
88                 return true
89         }
90 }
91
92 // Check confirms that the lock is still active (i.e., the session is
93 // still alive), and re-acquires if needed. Panics if Lock is not
94 // acquired first.
95 //
96 // Returns false if the context passed to Lock() is canceled before
97 // the lock is confirmed or reacquired.
98 func (dbl *DBLocker) Check() bool {
99         dbl.mtx.Lock()
100         err := dbl.conn.PingContext(dbl.ctx)
101         if err == context.Canceled {
102                 dbl.mtx.Unlock()
103                 return false
104         } else if err == nil {
105                 ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
106                 dbl.mtx.Unlock()
107                 return true
108         }
109         ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
110         dbl.conn.Close()
111         dbl.conn = nil
112         ctx, getdb := dbl.ctx, dbl.getdb
113         dbl.mtx.Unlock()
114         return dbl.Lock(ctx, getdb)
115 }
116
117 func (dbl *DBLocker) Unlock() {
118         dbl.mtx.Lock()
119         defer dbl.mtx.Unlock()
120         if dbl.conn != nil {
121                 _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
122                 if err != nil {
123                         ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
124                 } else {
125                         ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
126                 }
127                 dbl.conn.Close()
128                 dbl.conn = nil
129         }
130 }