18071: Use dblock to avoid concurrent keep-balance ops.
[arvados.git] / lib / controller / dblock / dblock.go
index a46201bb45af793062126689d773be61f9bbe232..472633747c5d9ba6d832777de704fc3cd25fa387 100644 (file)
@@ -15,9 +15,11 @@ import (
 )
 
 var (
-       TrashSweep        = &DBLocker{key: 10001}
-       ContainerLogSweep = &DBLocker{key: 10002}
-       retryDelay        = 5 * time.Second
+       TrashSweep         = &DBLocker{key: 10001}
+       ContainerLogSweep  = &DBLocker{key: 10002}
+       KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
+       KeepBalanceActive  = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
+       retryDelay         = 5 * time.Second
 )
 
 // DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
@@ -31,7 +33,9 @@ type DBLocker struct {
 }
 
 // Lock acquires the advisory lock, waiting/reconnecting if needed.
-func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) {
+//
+// Returns false if ctx is canceled before the lock is acquired.
+func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
        logger := ctxlog.FromContext(ctx)
        for ; ; time.Sleep(retryDelay) {
                dbl.mtx.Lock()
@@ -41,21 +45,33 @@ func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sql
                        dbl.mtx.Unlock()
                        continue
                }
+               if ctx.Err() != nil {
+                       dbl.mtx.Unlock()
+                       return false
+               }
                db, err := getdb(ctx)
-               if err != nil {
+               if err == context.Canceled {
+                       dbl.mtx.Unlock()
+                       return false
+               } else if err != nil {
                        logger.WithError(err).Infof("error getting database pool")
                        dbl.mtx.Unlock()
                        continue
                }
                conn, err := db.Conn(ctx)
-               if err != nil {
+               if err == context.Canceled {
+                       dbl.mtx.Unlock()
+                       return false
+               } else if err != nil {
                        logger.WithError(err).Info("error getting database connection")
                        dbl.mtx.Unlock()
                        continue
                }
                var locked bool
                err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
-               if err != nil {
+               if err == context.Canceled {
+                       return false
+               } else if err != nil {
                        logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
                        conn.Close()
                        dbl.mtx.Unlock()
@@ -69,27 +85,33 @@ func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sql
                logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
                dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
                dbl.mtx.Unlock()
-               return
+               return true
        }
 }
 
 // Check confirms that the lock is still active (i.e., the session is
 // still alive), and re-acquires if needed. Panics if Lock is not
 // acquired first.
-func (dbl *DBLocker) Check() {
+//
+// Returns false if the context passed to Lock() is canceled before
+// the lock is confirmed or reacquired.
+func (dbl *DBLocker) Check() bool {
        dbl.mtx.Lock()
        err := dbl.conn.PingContext(dbl.ctx)
-       if err == nil {
+       if err == context.Canceled {
+               dbl.mtx.Unlock()
+               return false
+       } else if err == nil {
                ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
                dbl.mtx.Unlock()
-               return
+               return true
        }
        ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
        dbl.conn.Close()
        dbl.conn = nil
        ctx, getdb := dbl.ctx, dbl.getdb
        dbl.mtx.Unlock()
-       dbl.Lock(ctx, getdb)
+       return dbl.Lock(ctx, getdb)
 }
 
 func (dbl *DBLocker) Unlock() {