Merge branch 'pr/25'
[arvados.git] / services / keepstore / trash_worker.go
1 package main
2
3 import (
4         "errors"
5         "log"
6         "time"
7 )
8
9 // RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
10 //      The channel will process trash list.
11 //              For each (next) trash request:
12 //      Delete the block indicated by the trash request Locator
13 //              Repeat
14 //
15 func RunTrashWorker(trashq *WorkQueue) {
16         for item := range trashq.NextItem {
17                 trashRequest := item.(TrashRequest)
18                 TrashItem(trashRequest)
19                 trashq.DoneItem <- struct{}{}
20         }
21 }
22
23 // TrashItem deletes the indicated block from every writable volume.
24 func TrashItem(trashRequest TrashRequest) {
25         reqMtime := time.Unix(trashRequest.BlockMtime, 0)
26         if time.Since(reqMtime) < blobSignatureTTL {
27                 log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
28                         time.Since(reqMtime),
29                         trashRequest.Locator,
30                         trashRequest.BlockMtime,
31                         reqMtime,
32                         blobSignatureTTL)
33                 return
34         }
35
36         for _, volume := range KeepVM.AllWritable() {
37                 mtime, err := volume.Mtime(trashRequest.Locator)
38                 if err != nil {
39                         log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
40                         continue
41                 }
42                 if trashRequest.BlockMtime != mtime.Unix() {
43                         log.Printf("%v Delete(%v): mtime on volume is %v does not match trash list value %v", volume, trashRequest.Locator, mtime.Unix(), trashRequest.BlockMtime)
44                         continue
45                 }
46
47                 if neverDelete {
48                         err = errors.New("did not delete block because neverDelete is true")
49                 } else {
50                         err = volume.Delete(trashRequest.Locator)
51                 }
52
53                 if err != nil {
54                         log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
55                 } else {
56                         log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator)
57                 }
58         }
59 }