X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f23f5689eac6354eb9567c91f2ff8586e2118e92..00c93619f7691c0828f5273bc457e2840dbdc084:/lib/controller/trash.go diff --git a/lib/controller/trash.go b/lib/controller/trash.go index 551b2f92bb..99e7aec0b6 100644 --- a/lib/controller/trash.go +++ b/lib/controller/trash.go @@ -5,6 +5,7 @@ package controller import ( + "context" "time" "git.arvados.org/arvados.git/lib/controller/dblock" @@ -12,22 +13,62 @@ import ( "git.arvados.org/arvados.git/sdk/go/ctxlog" ) -func (h *Handler) trashSweepWorker() { - sleep := h.Cluster.Collections.TrashSweepInterval.Duration() - logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep") +func (h *Handler) periodicWorker(workerName string, interval time.Duration, locker *dblock.DBLocker, run func(context.Context) error) { + logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", workerName) ctx := ctxlog.Context(h.BackgroundContext, logger) - if sleep <= 0 { - logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep) + if interval <= 0 { + logger.Debugf("interval is %v, not running worker", interval) return } - dblock.TrashSweep.Lock(ctx, h.db) - defer dblock.TrashSweep.Unlock() - for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) { - dblock.TrashSweep.Check() - ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}}) - _, err := h.federation.SysTrashSweep(ctx, struct{}{}) + if !locker.Lock(ctx, h.dbConnector.GetDB) { + // context canceled + return + } + defer locker.Unlock() + for time.Sleep(interval); ctx.Err() == nil; time.Sleep(interval) { + if !locker.Check() { + // context canceled + return + } + err := run(ctx) if err != nil { - logger.WithError(err).Info("trash sweep failed") + logger.WithError(err).Infof("%s failed", workerName) } } } + +func (h *Handler) trashSweepWorker() { + h.periodicWorker("trash sweep", h.Cluster.Collections.TrashSweepInterval.Duration(), dblock.TrashSweep, func(ctx context.Context) error { + ctx = auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}}) + _, err := h.federation.SysTrashSweep(ctx, struct{}{}) + return err + }) +} + +func (h *Handler) containerLogSweepWorker() { + h.periodicWorker("container log sweep", h.Cluster.Containers.Logging.SweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error { + db, err := h.dbConnector.GetDB(ctx) + if err != nil { + return err + } + res, err := db.ExecContext(ctx, ` +DELETE FROM logs + USING containers + WHERE logs.object_uuid=containers.uuid + AND logs.event_type in ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat', 'hoststat', 'node', 'container', 'keepstore') + AND containers.log IS NOT NULL + AND now() - containers.finished_at > $1::interval`, + h.Cluster.Containers.Logging.MaxAge.String()) + if err != nil { + return err + } + logger := ctxlog.FromContext(ctx) + rows, err := res.RowsAffected() + if err != nil { + logger.WithError(err).Warn("unexpected error from RowsAffected()") + } else { + logger.WithField("rows", rows).Info("deleted rows from logs table") + } + return nil + }) +}