-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepstore
import (
- "errors"
+ "context"
+ "sync"
+ "sync/atomic"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- log "github.com/Sirupsen/logrus"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/prometheus/client_golang/prometheus"
)
-// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
-// The channel will process trash list.
-// For each (next) trash request:
-// Delete the block indicated by the trash request Locator
-// Repeat
-//
-func RunTrashWorker(trashq *WorkQueue) {
- for item := range trashq.NextItem {
- trashRequest := item.(TrashRequest)
- TrashItem(trashRequest)
- trashq.DoneItem <- struct{}{}
- }
+type TrashListItem struct {
+ Locator string `json:"locator"`
+ BlockMtime int64 `json:"block_mtime"`
+ MountUUID string `json:"mount_uuid"` // Target mount, or "" for "everywhere"
+}
+
+type trasher struct {
+ keepstore *keepstore
+ todo []TrashListItem
+ cond *sync.Cond // lock guards todo accesses; cond broadcasts when todo becomes non-empty
+ inprogress atomic.Int64
}
-// TrashItem deletes the indicated block from every writable volume.
-func TrashItem(trashRequest TrashRequest) {
- reqMtime := time.Unix(0, trashRequest.BlockMtime)
- if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() {
- log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
- arvados.Duration(time.Since(reqMtime)),
- trashRequest.Locator,
- trashRequest.BlockMtime,
- reqMtime,
- theConfig.BlobSignatureTTL)
- return
+func newTrasher(ctx context.Context, keepstore *keepstore, reg *prometheus.Registry) *trasher {
+ t := &trasher{
+ keepstore: keepstore,
+ cond: sync.NewCond(&sync.Mutex{}),
+ }
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "trash_queue_pending_entries",
+ Help: "Number of queued trash requests",
+ },
+ func() float64 {
+ t.cond.L.Lock()
+ defer t.cond.L.Unlock()
+ return float64(len(t.todo))
+ },
+ ))
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "trash_queue_inprogress_entries",
+ Help: "Number of trash requests in progress",
+ },
+ func() float64 {
+ return float64(t.inprogress.Load())
+ },
+ ))
+ if !keepstore.cluster.Collections.BlobTrash {
+ keepstore.logger.Info("not running trash worker because Collections.BlobTrash == false")
+ return t
}
- for _, volume := range KeepVM.AllWritable() {
- mtime, err := volume.Mtime(trashRequest.Locator)
- if err != nil {
- log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
- continue
+ var mntsAllowTrash []*mount
+ for _, mnt := range t.keepstore.mounts {
+ if mnt.AllowTrash {
+ mntsAllowTrash = append(mntsAllowTrash, mnt)
}
- if trashRequest.BlockMtime != mtime.UnixNano() {
- log.Printf("%v Delete(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
- continue
+ }
+ if len(mntsAllowTrash) == 0 {
+ t.keepstore.logger.Info("not running trash worker because there are no writable or trashable volumes")
+ } else {
+ for i := 0; i < keepstore.cluster.Collections.BlobTrashConcurrency; i++ {
+ go t.runWorker(ctx, mntsAllowTrash)
}
+ }
+ return t
+}
- if !theConfig.EnableDelete {
- err = errors.New("did not delete block because EnableDelete is false")
- } else {
- err = volume.Trash(trashRequest.Locator)
- }
+func (t *trasher) SetTrashList(newlist []TrashListItem) {
+ t.cond.L.Lock()
+ t.todo = newlist
+ t.cond.L.Unlock()
+ t.cond.Broadcast()
+}
- if err != nil {
- log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
- } else {
- log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator)
+func (t *trasher) runWorker(ctx context.Context, mntsAllowTrash []*mount) {
+ go func() {
+ <-ctx.Done()
+ t.cond.Broadcast()
+ }()
+ for {
+ t.cond.L.Lock()
+ for len(t.todo) == 0 && ctx.Err() == nil {
+ t.cond.Wait()
}
+ if ctx.Err() != nil {
+ t.cond.L.Unlock()
+ return
+ }
+ item := t.todo[0]
+ t.todo = t.todo[1:]
+ t.inprogress.Add(1)
+ t.cond.L.Unlock()
+
+ func() {
+ defer t.inprogress.Add(-1)
+ logger := t.keepstore.logger.WithField("locator", item.Locator)
+
+ li, err := getLocatorInfo(item.Locator)
+ if err != nil {
+ logger.Warn("ignoring trash request for invalid locator")
+ return
+ }
+
+ reqMtime := time.Unix(0, item.BlockMtime)
+ if time.Since(reqMtime) < t.keepstore.cluster.Collections.BlobSigningTTL.Duration() {
+ logger.Warnf("client asked to delete a %v old block (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
+ arvados.Duration(time.Since(reqMtime)),
+ item.BlockMtime,
+ reqMtime,
+ t.keepstore.cluster.Collections.BlobSigningTTL)
+ return
+ }
+
+ var mnts []*mount
+ if item.MountUUID == "" {
+ mnts = mntsAllowTrash
+ } else if mnt := t.keepstore.mounts[item.MountUUID]; mnt == nil {
+ logger.Warnf("ignoring trash request for nonexistent mount %s", item.MountUUID)
+ return
+ } else if !mnt.AllowTrash {
+ logger.Warnf("ignoring trash request for readonly mount %s with AllowTrashWhenReadOnly==false", item.MountUUID)
+ return
+ } else {
+ mnts = []*mount{mnt}
+ }
+
+ for _, mnt := range mnts {
+ logger := logger.WithField("mount", mnt.UUID)
+ mtime, err := mnt.Mtime(li.hash)
+ if err != nil {
+ logger.WithError(err).Error("error getting stored mtime")
+ continue
+ }
+ if !mtime.Equal(reqMtime) {
+ logger.Infof("stored mtime (%v) does not match trash list mtime (%v); skipping", mtime, reqMtime)
+ continue
+ }
+ err = mnt.BlockTrash(li.hash)
+ if err != nil {
+ logger.WithError(err).Info("error trashing block")
+ continue
+ }
+ logger.Info("block trashed")
+ }
+ }()
}
}
+
+type trashEmptier struct{}
+
+func newTrashEmptier(ctx context.Context, ks *keepstore, reg *prometheus.Registry) *trashEmptier {
+ d := ks.cluster.Collections.BlobTrashCheckInterval.Duration()
+ if d <= 0 ||
+ !ks.cluster.Collections.BlobTrash ||
+ ks.cluster.Collections.BlobDeleteConcurrency <= 0 {
+ ks.logger.Infof("not running trash emptier because disabled by config (enabled=%t, interval=%v, concurrency=%d)", ks.cluster.Collections.BlobTrash, d, ks.cluster.Collections.BlobDeleteConcurrency)
+ return &trashEmptier{}
+ }
+ go func() {
+ ticker := time.NewTicker(d)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ }
+ for _, mnt := range ks.mounts {
+ if mnt.KeepMount.AllowTrash {
+ mnt.volume.EmptyTrash()
+ }
+ }
+ }
+ }()
+ return &trashEmptier{}
+}