import (
"context"
"database/sql"
+ "fmt"
+ "net"
"sync"
"time"
)
var (
- TrashSweep = &DBLocker{key: 10001}
- ContainerLogSweep = &DBLocker{key: 10002}
- retryDelay = 5 * time.Second
+ TrashSweep = &DBLocker{key: 10001}
+ ContainerLogSweep = &DBLocker{key: 10002}
+ KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
+ KeepBalanceActive = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
+ Dispatch = &DBLocker{key: 10005} // any dispatcher running
+ RailsMigrations = &DBLocker{key: 10006}
+ retryDelay = 5 * time.Second
)
// DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
}
// Lock acquires the advisory lock, waiting/reconnecting if needed.
-func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) {
- logger := ctxlog.FromContext(ctx)
+//
+// Returns false if ctx is canceled before the lock is acquired.
+func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
+ logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key)
+ var lastHeldBy string
for ; ; time.Sleep(retryDelay) {
dbl.mtx.Lock()
if dbl.conn != nil {
dbl.mtx.Unlock()
continue
}
+ if ctx.Err() != nil {
+ dbl.mtx.Unlock()
+ return false
+ }
db, err := getdb(ctx)
- if err != nil {
- logger.WithError(err).Infof("error getting database pool")
+ if err == context.Canceled {
+ dbl.mtx.Unlock()
+ return false
+ } else if err != nil {
+ logger.WithError(err).Info("error getting database pool")
dbl.mtx.Unlock()
continue
}
conn, err := db.Conn(ctx)
- if err != nil {
+ if err == context.Canceled {
+ dbl.mtx.Unlock()
+ return false
+ } else if err != nil {
logger.WithError(err).Info("error getting database connection")
dbl.mtx.Unlock()
continue
}
var locked bool
err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
- if err != nil {
- logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
+ if err == context.Canceled {
+ return false
+ } else if err != nil {
+ logger.WithError(err).Info("error getting pg_try_advisory_lock")
conn.Close()
dbl.mtx.Unlock()
continue
}
if !locked {
+ var host string
+ var port int
+ err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN
+ (SELECT pid FROM pg_locks
+ WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port)
+ if err != nil {
+ logger.WithError(err).Info("error getting other client info")
+ } else {
+ heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port))
+ if lastHeldBy != heldBy {
+ logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock")
+ lastHeldBy = heldBy
+ }
+ }
conn.Close()
dbl.mtx.Unlock()
continue
}
- logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
+ logger.Debug("acquired pg_advisory_lock")
dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
dbl.mtx.Unlock()
- return
+ return true
}
}
// Check confirms that the lock is still active (i.e., the session is
// still alive), and re-acquires if needed. Panics if Lock is not
// acquired first.
-func (dbl *DBLocker) Check() {
+//
+// Returns false if the context passed to Lock() is canceled before
+// the lock is confirmed or reacquired.
+func (dbl *DBLocker) Check() bool {
dbl.mtx.Lock()
err := dbl.conn.PingContext(dbl.ctx)
- if err == nil {
- ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
+ if err == context.Canceled {
+ dbl.mtx.Unlock()
+ return false
+ } else if err == nil {
+ ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive")
dbl.mtx.Unlock()
- return
+ return true
}
ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
dbl.conn.Close()
dbl.conn = nil
ctx, getdb := dbl.ctx, dbl.getdb
dbl.mtx.Unlock()
- dbl.Lock(ctx, getdb)
+ return dbl.Lock(ctx, getdb)
}
func (dbl *DBLocker) Unlock() {
if dbl.conn != nil {
_, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
if err != nil {
- ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
+ ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock")
} else {
- ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
+ ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock")
}
dbl.conn.Close()
dbl.conn = nil