7 "git.curoverse.com/arvados.git/sdk/go/arvados"
8 log "github.com/Sirupsen/logrus"
11 // RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
12 // The channel will process trash list.
13 // For each (next) trash request:
14 // Delete the block indicated by the trash request Locator
17 func RunTrashWorker(trashq *WorkQueue) {
18 for item := range trashq.NextItem {
19 trashRequest := item.(TrashRequest)
20 TrashItem(trashRequest)
21 trashq.DoneItem <- struct{}{}
25 // TrashItem deletes the indicated block from every writable volume.
26 func TrashItem(trashRequest TrashRequest) {
27 reqMtime := time.Unix(0, trashRequest.BlockMtime)
28 if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() {
29 log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
30 arvados.Duration(time.Since(reqMtime)),
32 trashRequest.BlockMtime,
34 theConfig.BlobSignatureTTL)
38 for _, volume := range KeepVM.AllWritable() {
39 mtime, err := volume.Mtime(trashRequest.Locator)
41 log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
44 if trashRequest.BlockMtime != mtime.UnixNano() {
45 log.Printf("%v Delete(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
49 if !theConfig.EnableDelete {
50 err = errors.New("did not delete block because EnableDelete is false")
52 err = volume.Trash(trashRequest.Locator)
56 log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
58 log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator)