package controller
import (
+ "context"
"time"
"git.arvados.org/arvados.git/lib/controller/dblock"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
)
-func (h *Handler) trashSweepWorker() {
- sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
- logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+func (h *Handler) periodicWorker(workerName string, interval time.Duration, locker *dblock.DBLocker, run func(context.Context) error) {
+ logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", workerName)
ctx := ctxlog.Context(h.BackgroundContext, logger)
- if sleep <= 0 {
- logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+ if interval <= 0 {
+ logger.Debugf("interval is %v, not running worker", interval)
+ return
+ }
+ if !locker.Lock(ctx, h.dbConnector.GetDB) {
+ // context canceled
return
}
- dblock.TrashSweep.Lock(ctx, h.db)
- defer dblock.TrashSweep.Unlock()
- for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
- dblock.TrashSweep.Check()
- ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+ defer locker.Unlock()
+ for ctxSleep(ctx, interval); ctx.Err() == nil; ctxSleep(ctx, interval) {
+ if !locker.Check() {
+ // context canceled
+ return
+ }
+ err := run(ctx)
+ if err != nil {
+ logger.WithError(err).Infof("%s failed", workerName)
+ }
+ }
+}
+
+func (h *Handler) trashSweepWorker() {
+ h.periodicWorker("trash sweep", h.Cluster.Collections.TrashSweepInterval.Duration(), dblock.TrashSweep, func(ctx context.Context) error {
+ ctx = auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
_, err := h.federation.SysTrashSweep(ctx, struct{}{})
+ return err
+ })
+}
+
+func (h *Handler) containerLogSweepWorker() {
+ // Since #21611 we don't expect any new log entries, so the
+ // periodic worker only runs once, then becomes a no-op.
+ //
+ // The old Containers.Logging.SweepInterval config is removed.
+ // We use TrashSweepInterval here instead, for testing
+ // reasons: it prevents the default integration-testing
+ // controller service (whose TrashSweepInterval is 0) from
+ // acquiring the dblock.
+ done := false
+ h.periodicWorker("container log sweep", h.Cluster.Collections.TrashSweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error {
+ if done {
+ return nil
+ }
+ db, err := h.dbConnector.GetDB(ctx)
+ if err != nil {
+ return err
+ }
+ res, err := db.ExecContext(ctx, `
+DELETE FROM logs
+ USING containers
+ WHERE logs.object_uuid=containers.uuid
+ AND logs.event_type in ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat', 'hoststat', 'node', 'container', 'keepstore')
+ AND containers.log IS NOT NULL`)
if err != nil {
- logger.WithError(err).Info("trash sweep failed")
+ return err
}
+ logger := ctxlog.FromContext(ctx)
+ rows, err := res.RowsAffected()
+ if err != nil {
+ logger.WithError(err).Warn("unexpected error from RowsAffected()")
+ } else {
+ logger.WithField("rows", rows).Info("deleted rows from logs table")
+ if rows == 0 {
+ done = true
+ }
+ }
+ return nil
+ })
+}
+
+// Sleep for the given duration, but return early if ctx cancels
+// before that.
+func ctxSleep(ctx context.Context, d time.Duration) {
+ select {
+ case <-ctx.Done():
+ case <-time.After(d):
}
}