1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
11 "git.arvados.org/arvados.git/sdk/go/arvados"
12 "github.com/sirupsen/logrus"
15 // RunTrashWorker processes the trash request queue.
16 func RunTrashWorker(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashq *WorkQueue) {
17 for item := range trashq.NextItem {
18 trashRequest := item.(TrashRequest)
19 TrashItem(volmgr, logger, cluster, trashRequest)
20 trashq.DoneItem <- struct{}{}
24 // TrashItem deletes the indicated block from every writable volume.
25 func TrashItem(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashRequest TrashRequest) {
26 reqMtime := time.Unix(0, trashRequest.BlockMtime)
27 if time.Since(reqMtime) < cluster.Collections.BlobSigningTTL.Duration() {
28 logger.Warnf("client asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
29 arvados.Duration(time.Since(reqMtime)),
31 trashRequest.BlockMtime,
33 cluster.Collections.BlobSigningTTL)
37 var volumes []*VolumeMount
38 if uuid := trashRequest.MountUUID; uuid == "" {
39 volumes = volmgr.AllWritable()
40 } else if mnt := volmgr.Lookup(uuid, true); mnt == nil {
41 logger.Warnf("trash request for nonexistent mount: %v", trashRequest)
44 volumes = []*VolumeMount{mnt}
47 for _, volume := range volumes {
48 mtime, err := volume.Mtime(trashRequest.Locator)
50 logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
53 if trashRequest.BlockMtime != mtime.UnixNano() {
54 logger.Infof("%v Trash(%v): stored mtime %v does not match trash list value %v; skipping", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
58 if !cluster.Collections.BlobTrash {
59 err = errors.New("skipping because Collections.BlobTrash is false")
61 err = volume.Trash(trashRequest.Locator)
65 logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
67 logger.Infof("%v Trash(%v) OK", volume, trashRequest.Locator)