Merge branch '8784-dir-listings'
[arvados.git] / services / keepstore / trash_worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "errors"
9         "time"
10
11         "git.curoverse.com/arvados.git/sdk/go/arvados"
12         log "github.com/Sirupsen/logrus"
13 )
14
15 // RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
16 //      The channel will process trash list.
17 //              For each (next) trash request:
18 //      Delete the block indicated by the trash request Locator
19 //              Repeat
20 //
21 func RunTrashWorker(trashq *WorkQueue) {
22         for item := range trashq.NextItem {
23                 trashRequest := item.(TrashRequest)
24                 TrashItem(trashRequest)
25                 trashq.DoneItem <- struct{}{}
26         }
27 }
28
29 // TrashItem deletes the indicated block from every writable volume.
30 func TrashItem(trashRequest TrashRequest) {
31         reqMtime := time.Unix(0, trashRequest.BlockMtime)
32         if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() {
33                 log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
34                         arvados.Duration(time.Since(reqMtime)),
35                         trashRequest.Locator,
36                         trashRequest.BlockMtime,
37                         reqMtime,
38                         theConfig.BlobSignatureTTL)
39                 return
40         }
41
42         var volumes []Volume
43         if uuid := trashRequest.MountUUID; uuid == "" {
44                 volumes = KeepVM.AllWritable()
45         } else if v := KeepVM.Lookup(uuid, true); v == nil {
46                 log.Printf("warning: trash request for nonexistent mount: %v", trashRequest)
47                 return
48         } else {
49                 volumes = []Volume{v}
50         }
51
52         for _, volume := range volumes {
53                 mtime, err := volume.Mtime(trashRequest.Locator)
54                 if err != nil {
55                         log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
56                         continue
57                 }
58                 if trashRequest.BlockMtime != mtime.UnixNano() {
59                         log.Printf("%v Delete(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
60                         continue
61                 }
62
63                 if !theConfig.EnableDelete {
64                         err = errors.New("did not delete block because EnableDelete is false")
65                 } else {
66                         err = volume.Trash(trashRequest.Locator)
67                 }
68
69                 if err != nil {
70                         log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
71                 } else {
72                         log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator)
73                 }
74         }
75 }