1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.arvados.org/arvados.git/sdk/go/arvados"
14 "github.com/prometheus/client_golang/prometheus"
17 type TrashListItem struct {
18 Locator string `json:"locator"`
19 BlockMtime int64 `json:"block_mtime"`
20 MountUUID string `json:"mount_uuid"` // Target mount, or "" for "everywhere"
26 cond *sync.Cond // lock guards todo accesses; cond broadcasts when todo becomes non-empty
27 inprogress atomic.Int64
30 func newTrasher(ctx context.Context, keepstore *keepstore, reg *prometheus.Registry) *trasher {
33 cond: sync.NewCond(&sync.Mutex{}),
35 reg.MustRegister(prometheus.NewGaugeFunc(
38 Subsystem: "keepstore",
39 Name: "trash_queue_pending_entries",
40 Help: "Number of queued trash requests",
44 defer t.cond.L.Unlock()
45 return float64(len(t.todo))
48 reg.MustRegister(prometheus.NewGaugeFunc(
51 Subsystem: "keepstore",
52 Name: "trash_queue_inprogress_entries",
53 Help: "Number of trash requests in progress",
56 return float64(t.inprogress.Load())
59 if !keepstore.cluster.Collections.BlobTrash {
60 keepstore.logger.Info("not running trash worker because Collections.BlobTrash == false")
64 var mntsAllowTrash []*mount
65 for _, mnt := range t.keepstore.mounts {
67 mntsAllowTrash = append(mntsAllowTrash, mnt)
70 if len(mntsAllowTrash) == 0 {
71 t.keepstore.logger.Info("not running trash worker because there are no writable or trashable volumes")
73 for i := 0; i < keepstore.cluster.Collections.BlobTrashConcurrency; i++ {
74 go t.runWorker(ctx, mntsAllowTrash)
80 func (t *trasher) SetTrashList(newlist []TrashListItem) {
87 func (t *trasher) runWorker(ctx context.Context, mntsAllowTrash []*mount) {
94 for len(t.todo) == 0 && ctx.Err() == nil {
107 defer t.inprogress.Add(-1)
108 logger := t.keepstore.logger.WithField("locator", item.Locator)
110 li, err := getLocatorInfo(item.Locator)
112 logger.Warn("ignoring trash request for invalid locator")
116 reqMtime := time.Unix(0, item.BlockMtime)
117 if time.Since(reqMtime) < t.keepstore.cluster.Collections.BlobSigningTTL.Duration() {
118 logger.Warnf("client asked to delete a %v old block (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
119 arvados.Duration(time.Since(reqMtime)),
122 t.keepstore.cluster.Collections.BlobSigningTTL)
127 if item.MountUUID == "" {
128 mnts = mntsAllowTrash
129 } else if mnt := t.keepstore.mounts[item.MountUUID]; mnt == nil {
130 logger.Warnf("ignoring trash request for nonexistent mount %s", item.MountUUID)
132 } else if !mnt.AllowTrash {
133 logger.Warnf("ignoring trash request for readonly mount %s with AllowTrashWhenReadOnly==false", item.MountUUID)
139 for _, mnt := range mnts {
140 logger := logger.WithField("mount", mnt.UUID)
141 mtime, err := mnt.Mtime(li.hash)
143 logger.WithError(err).Error("error getting stored mtime")
146 if !mtime.Equal(reqMtime) {
147 logger.Infof("stored mtime (%v) does not match trash list mtime (%v); skipping", mtime, reqMtime)
150 err = mnt.BlockTrash(li.hash)
152 logger.WithError(err).Info("error trashing block")
155 logger.Info("block trashed")
161 type trashEmptier struct{}
163 func newTrashEmptier(ctx context.Context, ks *keepstore, reg *prometheus.Registry) *trashEmptier {
164 d := ks.cluster.Collections.BlobTrashCheckInterval.Duration()
166 !ks.cluster.Collections.BlobTrash ||
167 ks.cluster.Collections.BlobDeleteConcurrency <= 0 {
168 ks.logger.Infof("not running trash emptier because disabled by config (enabled=%t, interval=%v, concurrency=%d)", ks.cluster.Collections.BlobTrash, d, ks.cluster.Collections.BlobDeleteConcurrency)
169 return &trashEmptier{}
172 ticker := time.NewTicker(d)
179 for _, mnt := range ks.mounts {
180 if mnt.KeepMount.AllowTrash {
181 mnt.volume.EmptyTrash()
186 return &trashEmptier{}