2960: Merge branch 'main' into 2960-keepstore-streaming
[arvados.git] / services / keepstore / trash_worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "context"
9         "sync"
10         "sync/atomic"
11         "time"
12
13         "git.arvados.org/arvados.git/sdk/go/arvados"
14         "github.com/prometheus/client_golang/prometheus"
15 )
16
17 type TrashListItem struct {
18         Locator    string `json:"locator"`
19         BlockMtime int64  `json:"block_mtime"`
20         MountUUID  string `json:"mount_uuid"` // Target mount, or "" for "everywhere"
21 }
22
23 type trasher struct {
24         keepstore  *keepstore
25         todo       []TrashListItem
26         cond       *sync.Cond // lock guards todo accesses; cond broadcasts when todo becomes non-empty
27         inprogress atomic.Int64
28 }
29
30 func newTrasher(ctx context.Context, keepstore *keepstore, reg *prometheus.Registry) *trasher {
31         t := &trasher{
32                 keepstore: keepstore,
33                 cond:      sync.NewCond(&sync.Mutex{}),
34         }
35         reg.MustRegister(prometheus.NewGaugeFunc(
36                 prometheus.GaugeOpts{
37                         Namespace: "arvados",
38                         Subsystem: "keepstore",
39                         Name:      "trash_queue_pending_entries",
40                         Help:      "Number of queued trash requests",
41                 },
42                 func() float64 {
43                         t.cond.L.Lock()
44                         defer t.cond.L.Unlock()
45                         return float64(len(t.todo))
46                 },
47         ))
48         reg.MustRegister(prometheus.NewGaugeFunc(
49                 prometheus.GaugeOpts{
50                         Namespace: "arvados",
51                         Subsystem: "keepstore",
52                         Name:      "trash_queue_inprogress_entries",
53                         Help:      "Number of trash requests in progress",
54                 },
55                 func() float64 {
56                         return float64(t.inprogress.Load())
57                 },
58         ))
59         if !keepstore.cluster.Collections.BlobTrash {
60                 keepstore.logger.Info("not running trash worker because Collections.BlobTrash == false")
61                 return t
62         }
63
64         var mntsAllowTrash []*mount
65         for _, mnt := range t.keepstore.mounts {
66                 if mnt.AllowTrash {
67                         mntsAllowTrash = append(mntsAllowTrash, mnt)
68                 }
69         }
70         if len(mntsAllowTrash) == 0 {
71                 t.keepstore.logger.Info("not running trash worker because there are no writable or trashable volumes")
72         } else {
73                 for i := 0; i < keepstore.cluster.Collections.BlobTrashConcurrency; i++ {
74                         go t.runWorker(ctx, mntsAllowTrash)
75                 }
76         }
77         return t
78 }
79
80 func (t *trasher) SetTrashList(newlist []TrashListItem) {
81         t.cond.L.Lock()
82         t.todo = newlist
83         t.cond.L.Unlock()
84         t.cond.Broadcast()
85 }
86
87 func (t *trasher) runWorker(ctx context.Context, mntsAllowTrash []*mount) {
88         go func() {
89                 <-ctx.Done()
90                 t.cond.Broadcast()
91         }()
92         for {
93                 t.cond.L.Lock()
94                 for len(t.todo) == 0 && ctx.Err() == nil {
95                         t.cond.Wait()
96                 }
97                 if ctx.Err() != nil {
98                         t.cond.L.Unlock()
99                         return
100                 }
101                 item := t.todo[0]
102                 t.todo = t.todo[1:]
103                 t.inprogress.Add(1)
104                 t.cond.L.Unlock()
105
106                 func() {
107                         defer t.inprogress.Add(-1)
108                         logger := t.keepstore.logger.WithField("locator", item.Locator)
109
110                         li, err := getLocatorInfo(item.Locator)
111                         if err != nil {
112                                 logger.Warn("ignoring trash request for invalid locator")
113                                 return
114                         }
115
116                         reqMtime := time.Unix(0, item.BlockMtime)
117                         if time.Since(reqMtime) < t.keepstore.cluster.Collections.BlobSigningTTL.Duration() {
118                                 logger.Warnf("client asked to delete a %v old block (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
119                                         arvados.Duration(time.Since(reqMtime)),
120                                         item.BlockMtime,
121                                         reqMtime,
122                                         t.keepstore.cluster.Collections.BlobSigningTTL)
123                                 return
124                         }
125
126                         var mnts []*mount
127                         if item.MountUUID == "" {
128                                 mnts = mntsAllowTrash
129                         } else if mnt := t.keepstore.mounts[item.MountUUID]; mnt == nil {
130                                 logger.Warnf("ignoring trash request for nonexistent mount %s", item.MountUUID)
131                                 return
132                         } else if !mnt.AllowTrash {
133                                 logger.Warnf("ignoring trash request for readonly mount %s with AllowTrashWhenReadOnly==false", item.MountUUID)
134                                 return
135                         } else {
136                                 mnts = []*mount{mnt}
137                         }
138
139                         for _, mnt := range mnts {
140                                 logger := logger.WithField("mount", mnt.UUID)
141                                 mtime, err := mnt.Mtime(li.hash)
142                                 if err != nil {
143                                         logger.WithError(err).Error("error getting stored mtime")
144                                         continue
145                                 }
146                                 if !mtime.Equal(reqMtime) {
147                                         logger.Infof("stored mtime (%v) does not match trash list mtime (%v); skipping", mtime, reqMtime)
148                                         continue
149                                 }
150                                 err = mnt.BlockTrash(li.hash)
151                                 if err != nil {
152                                         logger.WithError(err).Info("error trashing block")
153                                         continue
154                                 }
155                                 logger.Info("block trashed")
156                         }
157                 }()
158         }
159 }
160
161 type trashEmptier struct{}
162
163 func newTrashEmptier(ctx context.Context, ks *keepstore, reg *prometheus.Registry) *trashEmptier {
164         d := ks.cluster.Collections.BlobTrashCheckInterval.Duration()
165         if d <= 0 ||
166                 !ks.cluster.Collections.BlobTrash ||
167                 ks.cluster.Collections.BlobDeleteConcurrency <= 0 {
168                 ks.logger.Infof("not running trash emptier because disabled by config (enabled=%t, interval=%v, concurrency=%d)", ks.cluster.Collections.BlobTrash, d, ks.cluster.Collections.BlobDeleteConcurrency)
169                 return &trashEmptier{}
170         }
171         go func() {
172                 ticker := time.NewTicker(d)
173                 for {
174                         select {
175                         case <-ctx.Done():
176                                 return
177                         case <-ticker.C:
178                         }
179                         for _, mnt := range ks.mounts {
180                                 if mnt.KeepMount.AllowTrash {
181                                         mnt.volume.EmptyTrash()
182                                 }
183                         }
184                 }
185         }()
186         return &trashEmptier{}
187 }