X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/df5c912a9eb5af7222e5446bc437ee97262542c8..26411493537eeb54ddafa2e9e29a5723edcd0316:/services/keepstore/trash_worker.go diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go index 8a9fedfb70..819c25acc1 100644 --- a/services/keepstore/trash_worker.go +++ b/services/keepstore/trash_worker.go @@ -2,73 +2,186 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepstore import ( - "errors" + "context" + "sync" + "sync/atomic" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/prometheus/client_golang/prometheus" ) -// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine. -// The channel will process trash list. -// For each (next) trash request: -// Delete the block indicated by the trash request Locator -// Repeat -// -func RunTrashWorker(trashq *WorkQueue) { - for item := range trashq.NextItem { - trashRequest := item.(TrashRequest) - TrashItem(trashRequest) - trashq.DoneItem <- struct{}{} - } +type TrashListItem struct { + Locator string `json:"locator"` + BlockMtime int64 `json:"block_mtime"` + MountUUID string `json:"mount_uuid"` // Target mount, or "" for "everywhere" +} + +type trasher struct { + keepstore *keepstore + todo []TrashListItem + cond *sync.Cond // lock guards todo accesses; cond broadcasts when todo becomes non-empty + inprogress atomic.Int64 } -// TrashItem deletes the indicated block from every writable volume. -func TrashItem(trashRequest TrashRequest) { - reqMtime := time.Unix(0, trashRequest.BlockMtime) - if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() { - log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.", - arvados.Duration(time.Since(reqMtime)), - trashRequest.Locator, - trashRequest.BlockMtime, - reqMtime, - theConfig.BlobSignatureTTL) - return +func newTrasher(ctx context.Context, keepstore *keepstore, reg *prometheus.Registry) *trasher { + t := &trasher{ + keepstore: keepstore, + cond: sync.NewCond(&sync.Mutex{}), + } + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "keepstore", + Name: "trash_queue_pending_entries", + Help: "Number of queued trash requests", + }, + func() float64 { + t.cond.L.Lock() + defer t.cond.L.Unlock() + return float64(len(t.todo)) + }, + )) + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "keepstore", + Name: "trash_queue_inprogress_entries", + Help: "Number of trash requests in progress", + }, + func() float64 { + return float64(t.inprogress.Load()) + }, + )) + if !keepstore.cluster.Collections.BlobTrash { + keepstore.logger.Info("not running trash worker because Collections.BlobTrash == false") + return t } - var volumes []Volume - if uuid := trashRequest.MountUUID; uuid == "" { - volumes = KeepVM.AllWritable() - } else if v := KeepVM.Lookup(uuid, true); v == nil { - log.Printf("warning: trash request for nonexistent mount: %v", trashRequest) - return + var mntsAllowTrash []*mount + for _, mnt := range t.keepstore.mounts { + if mnt.AllowTrash { + mntsAllowTrash = append(mntsAllowTrash, mnt) + } + } + if len(mntsAllowTrash) == 0 { + t.keepstore.logger.Info("not running trash worker because there are no writable or trashable volumes") } else { - volumes = []Volume{v} + for i := 0; i < keepstore.cluster.Collections.BlobTrashConcurrency; i++ { + go t.runWorker(ctx, mntsAllowTrash) + } } + return t +} + +func (t *trasher) SetTrashList(newlist []TrashListItem) { + t.cond.L.Lock() + t.todo = newlist + t.cond.L.Unlock() + t.cond.Broadcast() +} - for _, volume := range volumes { - mtime, err := volume.Mtime(trashRequest.Locator) - if err != nil { - log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err) - continue +func (t *trasher) runWorker(ctx context.Context, mntsAllowTrash []*mount) { + go func() { + <-ctx.Done() + t.cond.Broadcast() + }() + for { + t.cond.L.Lock() + for len(t.todo) == 0 && ctx.Err() == nil { + t.cond.Wait() } - if trashRequest.BlockMtime != mtime.UnixNano() { - log.Printf("%v Trash(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime) - continue + if ctx.Err() != nil { + t.cond.L.Unlock() + return } + item := t.todo[0] + t.todo = t.todo[1:] + t.inprogress.Add(1) + t.cond.L.Unlock() - if !theConfig.EnableDelete { - err = errors.New("skipping because EnableDelete is false") - } else { - err = volume.Trash(trashRequest.Locator) - } + func() { + defer t.inprogress.Add(-1) + logger := t.keepstore.logger.WithField("locator", item.Locator) - if err != nil { - log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err) - } else { - log.Printf("%v Trash(%v) OK", volume, trashRequest.Locator) - } + li, err := getLocatorInfo(item.Locator) + if err != nil { + logger.Warn("ignoring trash request for invalid locator") + return + } + + reqMtime := time.Unix(0, item.BlockMtime) + if time.Since(reqMtime) < t.keepstore.cluster.Collections.BlobSigningTTL.Duration() { + logger.Warnf("client asked to delete a %v old block (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.", + arvados.Duration(time.Since(reqMtime)), + item.BlockMtime, + reqMtime, + t.keepstore.cluster.Collections.BlobSigningTTL) + return + } + + var mnts []*mount + if item.MountUUID == "" { + mnts = mntsAllowTrash + } else if mnt := t.keepstore.mounts[item.MountUUID]; mnt == nil { + logger.Warnf("ignoring trash request for nonexistent mount %s", item.MountUUID) + return + } else if !mnt.AllowTrash { + logger.Warnf("ignoring trash request for readonly mount %s with AllowTrashWhenReadOnly==false", item.MountUUID) + return + } else { + mnts = []*mount{mnt} + } + + for _, mnt := range mnts { + logger := logger.WithField("mount", mnt.UUID) + mtime, err := mnt.Mtime(li.hash) + if err != nil { + logger.WithError(err).Error("error getting stored mtime") + continue + } + if !mtime.Equal(reqMtime) { + logger.Infof("stored mtime (%v) does not match trash list mtime (%v); skipping", mtime, reqMtime) + continue + } + err = mnt.BlockTrash(li.hash) + if err != nil { + logger.WithError(err).Info("error trashing block") + continue + } + logger.Info("block trashed") + } + }() } } + +type trashEmptier struct{} + +func newTrashEmptier(ctx context.Context, ks *keepstore, reg *prometheus.Registry) *trashEmptier { + d := ks.cluster.Collections.BlobTrashCheckInterval.Duration() + if d <= 0 || + !ks.cluster.Collections.BlobTrash || + ks.cluster.Collections.BlobDeleteConcurrency <= 0 { + ks.logger.Infof("not running trash emptier because disabled by config (enabled=%t, interval=%v, concurrency=%d)", ks.cluster.Collections.BlobTrash, d, ks.cluster.Collections.BlobDeleteConcurrency) + return &trashEmptier{} + } + go func() { + ticker := time.NewTicker(d) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + for _, mnt := range ks.mounts { + if mnt.KeepMount.AllowTrash { + mnt.volume.EmptyTrash() + } + } + } + }() + return &trashEmptier{} +}