Merge branch '19982-spot-instance' refs #19982
[arvados.git] / lib / controller / trash.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package controller
6
7 import (
8         "context"
9         "time"
10
11         "git.arvados.org/arvados.git/lib/controller/dblock"
12         "git.arvados.org/arvados.git/sdk/go/auth"
13         "git.arvados.org/arvados.git/sdk/go/ctxlog"
14 )
15
16 func (h *Handler) periodicWorker(workerName string, interval time.Duration, locker *dblock.DBLocker, run func(context.Context) error) {
17         logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", workerName)
18         ctx := ctxlog.Context(h.BackgroundContext, logger)
19         if interval <= 0 {
20                 logger.Debugf("interval is %v, not running worker", interval)
21                 return
22         }
23         if !locker.Lock(ctx, h.dbConnector.GetDB) {
24                 // context canceled
25                 return
26         }
27         defer locker.Unlock()
28         for ctxSleep(ctx, interval); ctx.Err() == nil; ctxSleep(ctx, interval) {
29                 if !locker.Check() {
30                         // context canceled
31                         return
32                 }
33                 err := run(ctx)
34                 if err != nil {
35                         logger.WithError(err).Infof("%s failed", workerName)
36                 }
37         }
38 }
39
40 func (h *Handler) trashSweepWorker() {
41         h.periodicWorker("trash sweep", h.Cluster.Collections.TrashSweepInterval.Duration(), dblock.TrashSweep, func(ctx context.Context) error {
42                 ctx = auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
43                 _, err := h.federation.SysTrashSweep(ctx, struct{}{})
44                 return err
45         })
46 }
47
48 func (h *Handler) containerLogSweepWorker() {
49         // Since #21611 we don't expect any new log entries, so the
50         // periodic worker only runs once, then becomes a no-op.
51         //
52         // The old Containers.Logging.SweepInterval config is removed.
53         // We use TrashSweepInterval here instead, for testing
54         // reasons: it prevents the default integration-testing
55         // controller service (whose TrashSweepInterval is 0) from
56         // acquiring the dblock.
57         done := false
58         h.periodicWorker("container log sweep", h.Cluster.Collections.TrashSweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error {
59                 if done {
60                         return nil
61                 }
62                 db, err := h.dbConnector.GetDB(ctx)
63                 if err != nil {
64                         return err
65                 }
66                 res, err := db.ExecContext(ctx, `
67 DELETE FROM logs
68  USING containers
69  WHERE logs.object_uuid=containers.uuid
70  AND logs.event_type in ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat', 'hoststat', 'node', 'container', 'keepstore')
71  AND containers.log IS NOT NULL`)
72                 if err != nil {
73                         return err
74                 }
75                 logger := ctxlog.FromContext(ctx)
76                 rows, err := res.RowsAffected()
77                 if err != nil {
78                         logger.WithError(err).Warn("unexpected error from RowsAffected()")
79                 } else {
80                         logger.WithField("rows", rows).Info("deleted rows from logs table")
81                         if rows == 0 {
82                                 done = true
83                         }
84                 }
85                 return nil
86         })
87 }
88
89 // Sleep for the given duration, but return early if ctx cancels
90 // before that.
91 func ctxSleep(ctx context.Context, d time.Duration) {
92         select {
93         case <-ctx.Done():
94         case <-time.After(d):
95         }
96 }