--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dblock
+
+import (
+ "context"
+ "database/sql"
+ "sync"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/jmoiron/sqlx"
+)
+
+var (
+ TrashSweep = &DBLocker{key: 10001}
+ retryDelay = 5 * time.Second
+)
+
+// DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
+// a long-running task like "do X every N seconds".
+type DBLocker struct {
+ key int
+ mtx sync.Mutex
+ ctx context.Context
+ getdb func(context.Context) (*sqlx.DB, error)
+ conn *sql.Conn // != nil if advisory lock has been acquired
+}
+
+// Lock acquires the advisory lock, waiting/reconnecting if needed.
+func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) {
+ logger := ctxlog.FromContext(ctx)
+ for ; ; time.Sleep(retryDelay) {
+ dbl.mtx.Lock()
+ if dbl.conn != nil {
+ // Already locked by another caller in this
+ // process. Wait for them to release.
+ dbl.mtx.Unlock()
+ continue
+ }
+ db, err := getdb(ctx)
+ if err != nil {
+ logger.WithError(err).Infof("error getting database pool")
+ dbl.mtx.Unlock()
+ continue
+ }
+ conn, err := db.Conn(ctx)
+ if err != nil {
+ logger.WithError(err).Info("error getting database connection")
+ dbl.mtx.Unlock()
+ continue
+ }
+ _, err = conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, dbl.key)
+ if err != nil {
+ logger.WithError(err).Infof("error getting pg_advisory_lock %d", dbl.key)
+ conn.Close()
+ dbl.mtx.Unlock()
+ continue
+ }
+ logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
+ dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
+ dbl.mtx.Unlock()
+ return
+ }
+}
+
+// Check confirms that the lock is still active (i.e., the session is
+// still alive), and re-acquires if needed. Panics if Lock is not
+// acquired first.
+func (dbl *DBLocker) Check() {
+ dbl.mtx.Lock()
+ err := dbl.conn.PingContext(dbl.ctx)
+ if err == nil {
+ ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
+ dbl.mtx.Unlock()
+ return
+ }
+ ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
+ dbl.conn.Close()
+ dbl.conn = nil
+ ctx, getdb := dbl.ctx, dbl.getdb
+ dbl.mtx.Unlock()
+ dbl.Lock(ctx, getdb)
+}
+
+func (dbl *DBLocker) Unlock() {
+ dbl.mtx.Lock()
+ defer dbl.mtx.Unlock()
+ if dbl.conn != nil {
+ _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
+ if err != nil {
+ ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
+ } else {
+ ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
+ }
+ dbl.conn.Close()
+ dbl.conn = nil
+ }
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "time"
+
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+)
+
+func (h *Handler) trashSweepWorker() {
+ sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
+ logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+ ctx := ctxlog.Context(h.BackgroundContext, logger)
+ if sleep <= 0 {
+ logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+ return
+ }
+ dblock.TrashSweep.Lock(ctx, h.db)
+ defer dblock.TrashSweep.Unlock()
+ for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
+ dblock.TrashSweep.Check()
+ ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+ _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+ if err != nil {
+ logger.WithError(err).Info("trash sweep failed")
+ }
+ }
+}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package controller
-
-import (
- "context"
- "database/sql"
- "time"
-
- "git.arvados.org/arvados.git/sdk/go/auth"
- "git.arvados.org/arvados.git/sdk/go/ctxlog"
- "github.com/jmoiron/sqlx"
-)
-
-const (
- // lock keys should be added here with explicit values, to
- // ensure they do not get accidentally renumbered when a key
- // is added or removed.
- lockKeyTrashSweep = 10001
-)
-
-// dbLocker uses pg_advisory_lock to maintain a cluster-wide lock for
-// a long-running task like "do X every N seconds".
-type dbLocker struct {
- GetDB func(context.Context) (*sqlx.DB, error)
- LockKey int
-
- conn *sql.Conn // != nil if advisory lock is acquired
-}
-
-// Lock acquires the advisory lock the first time it is
-// called. Subsequent calls confirm that the lock is still active
-// (i.e., the session is still alive), and re-acquires if needed.
-func (dbl *dbLocker) Lock(ctx context.Context) {
- logger := ctxlog.FromContext(ctx)
- for ; ; time.Sleep(5 * time.Second) {
- if dbl.conn == nil {
- db, err := dbl.GetDB(ctx)
- if err != nil {
- logger.WithError(err).Infof("error getting database pool")
- continue
- }
- conn, err := db.Conn(ctx)
- if err != nil {
- logger.WithError(err).Info("error getting database connection")
- continue
- }
- _, err = conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, dbl.LockKey)
- if err != nil {
- logger.WithError(err).Info("error getting lock")
- conn.Close()
- continue
- }
- dbl.conn = conn
- }
- err := dbl.conn.PingContext(ctx)
- if err != nil {
- logger.WithError(err).Info("database connection ping failed")
- dbl.conn.Close()
- dbl.conn = nil
- continue
- }
- return
- }
-}
-
-func (dbl *dbLocker) Unlock() {
- if dbl.conn != nil {
- dbl.conn.Close()
- dbl.conn = nil
- }
-}
-
-func (h *Handler) trashSweepWorker() {
- sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
- logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
- ctx := ctxlog.Context(h.BackgroundContext, logger)
- if sleep <= 0 {
- logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
- return
- }
- locker := &dbLocker{GetDB: h.db, LockKey: lockKeyTrashSweep}
- locker.Lock(ctx)
- defer locker.Unlock()
- for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
- locker.Lock(ctx)
- ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
- _, err := h.federation.SysTrashSweep(ctx, struct{}{})
- if err != nil {
- logger.WithError(err).Info("trash sweep failed")
- }
- }
-}