1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
11 "git.arvados.org/arvados.git/lib/controller/dblock"
12 "git.arvados.org/arvados.git/sdk/go/auth"
13 "git.arvados.org/arvados.git/sdk/go/ctxlog"
16 func (h *Handler) periodicWorker(workerName string, interval time.Duration, locker *dblock.DBLocker, run func(context.Context) error) {
17 logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", workerName)
18 ctx := ctxlog.Context(h.BackgroundContext, logger)
20 logger.Debugf("interval is %v, not running worker", interval)
23 if !locker.Lock(ctx, h.dbConnector.GetDB) {
28 for ctxSleep(ctx, interval); ctx.Err() == nil; ctxSleep(ctx, interval) {
35 logger.WithError(err).Infof("%s failed", workerName)
40 func (h *Handler) trashSweepWorker() {
41 h.periodicWorker("trash sweep", h.Cluster.Collections.TrashSweepInterval.Duration(), dblock.TrashSweep, func(ctx context.Context) error {
42 ctx = auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
43 _, err := h.federation.SysTrashSweep(ctx, struct{}{})
48 func (h *Handler) containerLogSweepWorker() {
49 // Since #21611 we don't expect any new log entries, so the
50 // periodic worker only runs once, then becomes a no-op.
52 // The old Containers.Logging.SweepInterval config is removed.
53 // We use TrashSweepInterval here instead, for testing
54 // reasons: it prevents the default integration-testing
55 // controller service (whose TrashSweepInterval is 0) from
56 // acquiring the dblock.
58 h.periodicWorker("container log sweep", h.Cluster.Collections.TrashSweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error {
62 db, err := h.dbConnector.GetDB(ctx)
66 res, err := db.ExecContext(ctx, `
69 WHERE logs.object_uuid=containers.uuid
70 AND logs.event_type in ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat', 'hoststat', 'node', 'container', 'keepstore')
71 AND containers.log IS NOT NULL`)
75 logger := ctxlog.FromContext(ctx)
76 rows, err := res.RowsAffected()
78 logger.WithError(err).Warn("unexpected error from RowsAffected()")
80 logger.WithField("rows", rows).Info("deleted rows from logs table")
89 // Sleep for the given duration, but return early if ctx cancels
91 func ctxSleep(ctx context.Context, d time.Duration) {