Merge branch '21189-changeset-limit'
[arvados.git] / lib / controller / dblock / dblock.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dblock
6
7 import (
8         "context"
9         "database/sql"
10         "fmt"
11         "net"
12         "sync"
13         "time"
14
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16         "github.com/jmoiron/sqlx"
17 )
18
19 var (
20         TrashSweep         = &DBLocker{key: 10001}
21         ContainerLogSweep  = &DBLocker{key: 10002}
22         KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
23         KeepBalanceActive  = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
24         Dispatch           = &DBLocker{key: 10005} // any dispatcher running
25         RailsMigrations    = &DBLocker{key: 10006}
26         retryDelay         = 5 * time.Second
27 )
28
29 // DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
30 // a long-running task like "do X every N seconds".
31 type DBLocker struct {
32         key   int
33         mtx   sync.Mutex
34         ctx   context.Context
35         getdb func(context.Context) (*sqlx.DB, error)
36         conn  *sql.Conn // != nil if advisory lock has been acquired
37 }
38
39 // Lock acquires the advisory lock, waiting/reconnecting if needed.
40 //
41 // Returns false if ctx is canceled before the lock is acquired.
42 func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
43         logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key)
44         var lastHeldBy string
45         for ; ; time.Sleep(retryDelay) {
46                 dbl.mtx.Lock()
47                 if dbl.conn != nil {
48                         // Another goroutine is already locked/waiting
49                         // on this lock. Wait for them to release.
50                         dbl.mtx.Unlock()
51                         continue
52                 }
53                 if ctx.Err() != nil {
54                         dbl.mtx.Unlock()
55                         return false
56                 }
57                 db, err := getdb(ctx)
58                 if err == context.Canceled {
59                         dbl.mtx.Unlock()
60                         return false
61                 } else if err != nil {
62                         logger.WithError(err).Info("error getting database pool")
63                         dbl.mtx.Unlock()
64                         continue
65                 }
66                 conn, err := db.Conn(ctx)
67                 if err == context.Canceled {
68                         dbl.mtx.Unlock()
69                         return false
70                 } else if err != nil {
71                         logger.WithError(err).Info("error getting database connection")
72                         dbl.mtx.Unlock()
73                         continue
74                 }
75                 var locked bool
76                 err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
77                 if err == context.Canceled {
78                         return false
79                 } else if err != nil {
80                         logger.WithError(err).Info("error getting pg_try_advisory_lock")
81                         conn.Close()
82                         dbl.mtx.Unlock()
83                         continue
84                 }
85                 if !locked {
86                         var host string
87                         var port int
88                         err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN
89                                 (SELECT pid FROM pg_locks
90                                  WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port)
91                         if err != nil {
92                                 logger.WithError(err).Info("error getting other client info")
93                         } else {
94                                 heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port))
95                                 if lastHeldBy != heldBy {
96                                         logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock")
97                                         lastHeldBy = heldBy
98                                 }
99                         }
100                         conn.Close()
101                         dbl.mtx.Unlock()
102                         continue
103                 }
104                 logger.Debug("acquired pg_advisory_lock")
105                 dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
106                 dbl.mtx.Unlock()
107                 return true
108         }
109 }
110
111 // Check confirms that the lock is still active (i.e., the session is
112 // still alive), and re-acquires if needed. Panics if Lock is not
113 // acquired first.
114 //
115 // Returns false if the context passed to Lock() is canceled before
116 // the lock is confirmed or reacquired.
117 func (dbl *DBLocker) Check() bool {
118         dbl.mtx.Lock()
119         err := dbl.conn.PingContext(dbl.ctx)
120         if err == context.Canceled {
121                 dbl.mtx.Unlock()
122                 return false
123         } else if err == nil {
124                 ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive")
125                 dbl.mtx.Unlock()
126                 return true
127         }
128         ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
129         dbl.conn.Close()
130         dbl.conn = nil
131         ctx, getdb := dbl.ctx, dbl.getdb
132         dbl.mtx.Unlock()
133         return dbl.Lock(ctx, getdb)
134 }
135
136 func (dbl *DBLocker) Unlock() {
137         dbl.mtx.Lock()
138         defer dbl.mtx.Unlock()
139         if dbl.conn != nil {
140                 _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
141                 if err != nil {
142                         ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock")
143                 } else {
144                         ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock")
145                 }
146                 dbl.conn.Close()
147                 dbl.conn = nil
148         }
149 }