Merge branch '19377-verbose'
[arvados.git] / lib / controller / dblock / dblock.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dblock
6
7 import (
8         "context"
9         "database/sql"
10         "sync"
11         "time"
12
13         "git.arvados.org/arvados.git/sdk/go/ctxlog"
14         "github.com/jmoiron/sqlx"
15 )
16
17 var (
18         TrashSweep        = &DBLocker{key: 10001}
19         ContainerLogSweep = &DBLocker{key: 10002}
20         retryDelay        = 5 * time.Second
21 )
22
23 // DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
24 // a long-running task like "do X every N seconds".
25 type DBLocker struct {
26         key   int
27         mtx   sync.Mutex
28         ctx   context.Context
29         getdb func(context.Context) (*sqlx.DB, error)
30         conn  *sql.Conn // != nil if advisory lock has been acquired
31 }
32
33 // Lock acquires the advisory lock, waiting/reconnecting if needed.
34 func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) {
35         logger := ctxlog.FromContext(ctx)
36         for ; ; time.Sleep(retryDelay) {
37                 dbl.mtx.Lock()
38                 if dbl.conn != nil {
39                         // Another goroutine is already locked/waiting
40                         // on this lock. Wait for them to release.
41                         dbl.mtx.Unlock()
42                         continue
43                 }
44                 db, err := getdb(ctx)
45                 if err != nil {
46                         logger.WithError(err).Infof("error getting database pool")
47                         dbl.mtx.Unlock()
48                         continue
49                 }
50                 conn, err := db.Conn(ctx)
51                 if err != nil {
52                         logger.WithError(err).Info("error getting database connection")
53                         dbl.mtx.Unlock()
54                         continue
55                 }
56                 var locked bool
57                 err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
58                 if err != nil {
59                         logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
60                         conn.Close()
61                         dbl.mtx.Unlock()
62                         continue
63                 }
64                 if !locked {
65                         conn.Close()
66                         dbl.mtx.Unlock()
67                         continue
68                 }
69                 logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
70                 dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
71                 dbl.mtx.Unlock()
72                 return
73         }
74 }
75
76 // Check confirms that the lock is still active (i.e., the session is
77 // still alive), and re-acquires if needed. Panics if Lock is not
78 // acquired first.
79 func (dbl *DBLocker) Check() {
80         dbl.mtx.Lock()
81         err := dbl.conn.PingContext(dbl.ctx)
82         if err == nil {
83                 ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
84                 dbl.mtx.Unlock()
85                 return
86         }
87         ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
88         dbl.conn.Close()
89         dbl.conn = nil
90         ctx, getdb := dbl.ctx, dbl.getdb
91         dbl.mtx.Unlock()
92         dbl.Lock(ctx, getdb)
93 }
94
95 func (dbl *DBLocker) Unlock() {
96         dbl.mtx.Lock()
97         defer dbl.mtx.Unlock()
98         if dbl.conn != nil {
99                 _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
100                 if err != nil {
101                         ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
102                 } else {
103                         ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
104                 }
105                 dbl.conn.Close()
106                 dbl.conn = nil
107         }
108 }