1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.curoverse.com/arvados.git/sdk/go/arvados"
15 // RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
16 // The channel will process trash list.
17 // For each (next) trash request:
18 // Delete the block indicated by the trash request Locator
21 func RunTrashWorker(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashq *WorkQueue) {
22 for item := range trashq.NextItem {
23 trashRequest := item.(TrashRequest)
24 TrashItem(volmgr, cluster, trashRequest)
25 trashq.DoneItem <- struct{}{}
29 // TrashItem deletes the indicated block from every writable volume.
30 func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest TrashRequest) {
31 reqMtime := time.Unix(0, trashRequest.BlockMtime)
32 if time.Since(reqMtime) < cluster.Collections.BlobSigningTTL.Duration() {
33 log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
34 arvados.Duration(time.Since(reqMtime)),
36 trashRequest.BlockMtime,
38 cluster.Collections.BlobSigningTTL)
42 var volumes []*VolumeMount
43 if uuid := trashRequest.MountUUID; uuid == "" {
44 volumes = volmgr.AllWritable()
45 } else if mnt := volmgr.Lookup(uuid, true); mnt == nil {
46 log.Printf("warning: trash request for nonexistent mount: %v", trashRequest)
49 volumes = []*VolumeMount{mnt}
52 for _, volume := range volumes {
53 mtime, err := volume.Mtime(trashRequest.Locator)
55 log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
58 if trashRequest.BlockMtime != mtime.UnixNano() {
59 log.Printf("%v Trash(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
63 if !cluster.Collections.BlobTrash {
64 err = errors.New("skipping because Collections.BlobTrash is false")
66 err = volume.Trash(trashRequest.Locator)
70 log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
72 log.Printf("%v Trash(%v) OK", volume, trashRequest.Locator)