import (
"context"
"database/sql"
+ "fmt"
+ "net"
"sync"
"time"
ContainerLogSweep = &DBLocker{key: 10002}
KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
KeepBalanceActive = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
+ Dispatch = &DBLocker{key: 10005} // any dispatcher running
+ RailsMigrations = &DBLocker{key: 10006}
retryDelay = 5 * time.Second
)
//
// Returns false if ctx is canceled before the lock is acquired.
func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
- logger := ctxlog.FromContext(ctx)
+ logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key)
+ var lastHeldBy string
for ; ; time.Sleep(retryDelay) {
dbl.mtx.Lock()
if dbl.conn != nil {
dbl.mtx.Unlock()
return false
} else if err != nil {
- logger.WithError(err).Infof("error getting database pool")
+ logger.WithError(err).Info("error getting database pool")
dbl.mtx.Unlock()
continue
}
if err == context.Canceled {
return false
} else if err != nil {
- logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
+ logger.WithError(err).Info("error getting pg_try_advisory_lock")
conn.Close()
dbl.mtx.Unlock()
continue
}
if !locked {
+ var host string
+ var port int
+ err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN
+ (SELECT pid FROM pg_locks
+ WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port)
+ if err != nil {
+ logger.WithError(err).Info("error getting other client info")
+ } else {
+ heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port))
+ if lastHeldBy != heldBy {
+ logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock")
+ lastHeldBy = heldBy
+ }
+ }
conn.Close()
dbl.mtx.Unlock()
continue
}
- logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
+ logger.Debug("acquired pg_advisory_lock")
dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
dbl.mtx.Unlock()
return true
dbl.mtx.Unlock()
return false
} else if err == nil {
- ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
+ ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive")
dbl.mtx.Unlock()
return true
}
if dbl.conn != nil {
_, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
if err != nil {
- ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
+ ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock")
} else {
- ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
+ ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock")
}
dbl.conn.Close()
dbl.conn = nil