Change statfile in run.py to be able to raise OSError, and change the mock in test_pa...
[arvados.git] / services / keepstore / trash_worker.go
1 package main
2
3 import (
4         "errors"
5         "time"
6
7         "git.curoverse.com/arvados.git/sdk/go/arvados"
8         log "github.com/Sirupsen/logrus"
9 )
10
11 // RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
12 //      The channel will process trash list.
13 //              For each (next) trash request:
14 //      Delete the block indicated by the trash request Locator
15 //              Repeat
16 //
17 func RunTrashWorker(trashq *WorkQueue) {
18         for item := range trashq.NextItem {
19                 trashRequest := item.(TrashRequest)
20                 TrashItem(trashRequest)
21                 trashq.DoneItem <- struct{}{}
22         }
23 }
24
25 // TrashItem deletes the indicated block from every writable volume.
26 func TrashItem(trashRequest TrashRequest) {
27         reqMtime := time.Unix(0, trashRequest.BlockMtime)
28         if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() {
29                 log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
30                         arvados.Duration(time.Since(reqMtime)),
31                         trashRequest.Locator,
32                         trashRequest.BlockMtime,
33                         reqMtime,
34                         theConfig.BlobSignatureTTL)
35                 return
36         }
37
38         var volumes []Volume
39         if uuid := trashRequest.MountUUID; uuid == "" {
40                 volumes = KeepVM.AllWritable()
41         } else if v := KeepVM.Lookup(uuid, true); v == nil {
42                 log.Printf("warning: trash request for nonexistent mount: %v", trashRequest)
43                 return
44         } else {
45                 volumes = []Volume{v}
46         }
47
48         for _, volume := range volumes {
49                 mtime, err := volume.Mtime(trashRequest.Locator)
50                 if err != nil {
51                         log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
52                         continue
53                 }
54                 if trashRequest.BlockMtime != mtime.UnixNano() {
55                         log.Printf("%v Delete(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
56                         continue
57                 }
58
59                 if !theConfig.EnableDelete {
60                         err = errors.New("did not delete block because EnableDelete is false")
61                 } else {
62                         err = volume.Trash(trashRequest.Locator)
63                 }
64
65                 if err != nil {
66                         log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
67                 } else {
68                         log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator)
69                 }
70         }
71 }