18071: Use dblock to prevent multiple dispatchers from competing.
[arvados.git] / lib / controller / dblock / dblock.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dblock
6
7 import (
8         "context"
9         "database/sql"
10         "fmt"
11         "net"
12         "sync"
13         "time"
14
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16         "github.com/jmoiron/sqlx"
17 )
18
19 var (
20         TrashSweep         = &DBLocker{key: 10001}
21         ContainerLogSweep  = &DBLocker{key: 10002}
22         KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
23         KeepBalanceActive  = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
24         Dispatch           = &DBLocker{key: 10005} // any dispatcher running
25         retryDelay         = 5 * time.Second
26 )
27
28 // DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
29 // a long-running task like "do X every N seconds".
30 type DBLocker struct {
31         key   int
32         mtx   sync.Mutex
33         ctx   context.Context
34         getdb func(context.Context) (*sqlx.DB, error)
35         conn  *sql.Conn // != nil if advisory lock has been acquired
36 }
37
38 // Lock acquires the advisory lock, waiting/reconnecting if needed.
39 //
40 // Returns false if ctx is canceled before the lock is acquired.
41 func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
42         logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key)
43         var lastHeldBy string
44         for ; ; time.Sleep(retryDelay) {
45                 dbl.mtx.Lock()
46                 if dbl.conn != nil {
47                         // Another goroutine is already locked/waiting
48                         // on this lock. Wait for them to release.
49                         dbl.mtx.Unlock()
50                         continue
51                 }
52                 if ctx.Err() != nil {
53                         dbl.mtx.Unlock()
54                         return false
55                 }
56                 db, err := getdb(ctx)
57                 if err == context.Canceled {
58                         dbl.mtx.Unlock()
59                         return false
60                 } else if err != nil {
61                         logger.WithError(err).Info("error getting database pool")
62                         dbl.mtx.Unlock()
63                         continue
64                 }
65                 conn, err := db.Conn(ctx)
66                 if err == context.Canceled {
67                         dbl.mtx.Unlock()
68                         return false
69                 } else if err != nil {
70                         logger.WithError(err).Info("error getting database connection")
71                         dbl.mtx.Unlock()
72                         continue
73                 }
74                 var locked bool
75                 err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
76                 if err == context.Canceled {
77                         return false
78                 } else if err != nil {
79                         logger.WithError(err).Info("error getting pg_try_advisory_lock")
80                         conn.Close()
81                         dbl.mtx.Unlock()
82                         continue
83                 }
84                 if !locked {
85                         var host string
86                         var port int
87                         err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN
88                                 (SELECT pid FROM pg_locks
89                                  WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port)
90                         if err != nil {
91                                 logger.WithError(err).Info("error getting other client info")
92                         } else {
93                                 heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port))
94                                 if lastHeldBy != heldBy {
95                                         logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock")
96                                         lastHeldBy = heldBy
97                                 }
98                         }
99                         conn.Close()
100                         dbl.mtx.Unlock()
101                         continue
102                 }
103                 logger.Debug("acquired pg_advisory_lock")
104                 dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
105                 dbl.mtx.Unlock()
106                 return true
107         }
108 }
109
110 // Check confirms that the lock is still active (i.e., the session is
111 // still alive), and re-acquires if needed. Panics if Lock is not
112 // acquired first.
113 //
114 // Returns false if the context passed to Lock() is canceled before
115 // the lock is confirmed or reacquired.
116 func (dbl *DBLocker) Check() bool {
117         dbl.mtx.Lock()
118         err := dbl.conn.PingContext(dbl.ctx)
119         if err == context.Canceled {
120                 dbl.mtx.Unlock()
121                 return false
122         } else if err == nil {
123                 ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive")
124                 dbl.mtx.Unlock()
125                 return true
126         }
127         ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
128         dbl.conn.Close()
129         dbl.conn = nil
130         ctx, getdb := dbl.ctx, dbl.getdb
131         dbl.mtx.Unlock()
132         return dbl.Lock(ctx, getdb)
133 }
134
135 func (dbl *DBLocker) Unlock() {
136         dbl.mtx.Lock()
137         defer dbl.mtx.Unlock()
138         if dbl.conn != nil {
139                 _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
140                 if err != nil {
141                         ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock")
142                 } else {
143                         ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock")
144                 }
145                 dbl.conn.Close()
146                 dbl.conn = nil
147         }
148 }