6f5db10066db100063f2b60682144eef1e35a2b5
[arvados.git] / lib / controller / dblock / dblock.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dblock
6
7 import (
8         "context"
9         "database/sql"
10         "fmt"
11         "net"
12         "sync"
13         "time"
14
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16         "github.com/jmoiron/sqlx"
17 )
18
19 var (
20         TrashSweep         = &DBLocker{key: 10001}
21         ContainerLogSweep  = &DBLocker{key: 10002}
22         KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
23         KeepBalanceActive  = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
24         retryDelay         = 5 * time.Second
25 )
26
27 // DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
28 // a long-running task like "do X every N seconds".
29 type DBLocker struct {
30         key   int
31         mtx   sync.Mutex
32         ctx   context.Context
33         getdb func(context.Context) (*sqlx.DB, error)
34         conn  *sql.Conn // != nil if advisory lock has been acquired
35 }
36
37 // Lock acquires the advisory lock, waiting/reconnecting if needed.
38 //
39 // Returns false if ctx is canceled before the lock is acquired.
40 func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
41         logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key)
42         var lastHeldBy string
43         for ; ; time.Sleep(retryDelay) {
44                 dbl.mtx.Lock()
45                 if dbl.conn != nil {
46                         // Another goroutine is already locked/waiting
47                         // on this lock. Wait for them to release.
48                         dbl.mtx.Unlock()
49                         continue
50                 }
51                 if ctx.Err() != nil {
52                         dbl.mtx.Unlock()
53                         return false
54                 }
55                 db, err := getdb(ctx)
56                 if err == context.Canceled {
57                         dbl.mtx.Unlock()
58                         return false
59                 } else if err != nil {
60                         logger.WithError(err).Info("error getting database pool")
61                         dbl.mtx.Unlock()
62                         continue
63                 }
64                 conn, err := db.Conn(ctx)
65                 if err == context.Canceled {
66                         dbl.mtx.Unlock()
67                         return false
68                 } else if err != nil {
69                         logger.WithError(err).Info("error getting database connection")
70                         dbl.mtx.Unlock()
71                         continue
72                 }
73                 var locked bool
74                 err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
75                 if err == context.Canceled {
76                         return false
77                 } else if err != nil {
78                         logger.WithError(err).Info("error getting pg_try_advisory_lock")
79                         conn.Close()
80                         dbl.mtx.Unlock()
81                         continue
82                 }
83                 if !locked {
84                         var host string
85                         var port int
86                         err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN
87                                 (SELECT pid FROM pg_locks
88                                  WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port)
89                         if err != nil {
90                                 logger.WithError(err).Info("error getting other client info")
91                         } else {
92                                 heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port))
93                                 if lastHeldBy != heldBy {
94                                         logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock")
95                                         lastHeldBy = heldBy
96                                 }
97                         }
98                         conn.Close()
99                         dbl.mtx.Unlock()
100                         continue
101                 }
102                 logger.Debug("acquired pg_advisory_lock")
103                 dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
104                 dbl.mtx.Unlock()
105                 return true
106         }
107 }
108
109 // Check confirms that the lock is still active (i.e., the session is
110 // still alive), and re-acquires if needed. Panics if Lock is not
111 // acquired first.
112 //
113 // Returns false if the context passed to Lock() is canceled before
114 // the lock is confirmed or reacquired.
115 func (dbl *DBLocker) Check() bool {
116         dbl.mtx.Lock()
117         err := dbl.conn.PingContext(dbl.ctx)
118         if err == context.Canceled {
119                 dbl.mtx.Unlock()
120                 return false
121         } else if err == nil {
122                 ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive")
123                 dbl.mtx.Unlock()
124                 return true
125         }
126         ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
127         dbl.conn.Close()
128         dbl.conn = nil
129         ctx, getdb := dbl.ctx, dbl.getdb
130         dbl.mtx.Unlock()
131         return dbl.Lock(ctx, getdb)
132 }
133
134 func (dbl *DBLocker) Unlock() {
135         dbl.mtx.Lock()
136         defer dbl.mtx.Unlock()
137         if dbl.conn != nil {
138                 _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
139                 if err != nil {
140                         ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock")
141                 } else {
142                         ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock")
143                 }
144                 dbl.conn.Close()
145                 dbl.conn = nil
146         }
147 }