+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "context"
+ "database/sql"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/jmoiron/sqlx"
+)
+
+const (
+ // lock keys should be added here with explicit values, to
+ // ensure they do not get accidentally renumbered when a key
+ // is added or removed.
+ lockKeyTrashSweep = 10001
+)
+
+// dbLocker uses pg_advisory_lock to maintain a cluster-wide lock for
+// a long-running task like "do X every N seconds".
+type dbLocker struct {
+ GetDB func(context.Context) (*sqlx.DB, error)
+ LockKey int
+
+ conn *sql.Conn // != nil if advisory lock is acquired
+}
+
+// Lock acquires the advisory lock the first time it is
+// called. Subsequent calls confirm that the lock is still active
+// (i.e., the session is still alive), and re-acquires if needed.
+func (dbl *dbLocker) Lock(ctx context.Context) {
+ logger := ctxlog.FromContext(ctx)
+ for ; ; time.Sleep(5 * time.Second) {
+ if dbl.conn == nil {
+ db, err := dbl.GetDB(ctx)
+ if err != nil {
+ logger.WithError(err).Infof("error getting database pool")
+ continue
+ }
+ conn, err := db.Conn(ctx)
+ if err != nil {
+ logger.WithError(err).Info("error getting database connection")
+ continue
+ }
+ _, err = conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, dbl.LockKey)
+ if err != nil {
+ logger.WithError(err).Info("error getting lock")
+ conn.Close()
+ continue
+ }
+ dbl.conn = conn
+ }
+ err := dbl.conn.PingContext(ctx)
+ if err != nil {
+ logger.WithError(err).Info("database connection ping failed")
+ dbl.conn.Close()
+ dbl.conn = nil
+ continue
+ }
+ return
+ }
+}
+
+func (dbl *dbLocker) Unlock() {
+ if dbl.conn != nil {
+ dbl.conn.Close()
+ dbl.conn = nil
+ }
+}
+
+func (h *Handler) trashSweepWorker() {
+ sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
+ logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+ ctx := ctxlog.Context(h.BackgroundContext, logger)
+ if sleep <= 0 {
+ logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+ return
+ }
+ locker := &dbLocker{GetDB: h.db, LockKey: lockKeyTrashSweep}
+ locker.Lock(ctx)
+ defer locker.Unlock()
+ for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
+ locker.Lock(ctx)
+ ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+ _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+ if err != nil {
+ logger.WithError(err).Info("trash sweep failed")
+ }
+ }
+}