refs #6588
[arvados.git] / services / keepstore / trash_worker.go
1 package main
2
3 import (
4         "errors"
5         "log"
6         "time"
7 )
8
9 /*
10         Keepstore initiates trash worker channel goroutine.
11         The channel will process trash list.
12                 For each (next) trash request:
13       Delete the block indicated by the trash request Locator
14                 Repeat
15 */
16
17 func RunTrashWorker(trashq *WorkQueue) {
18         for item := range trashq.NextItem {
19                 trashRequest := item.(TrashRequest)
20                 TrashItem(trashRequest)
21         }
22 }
23
24 // TrashItem deletes the indicated block from every writable volume.
25 func TrashItem(trashRequest TrashRequest) {
26         reqMtime := time.Unix(trashRequest.BlockMtime, 0)
27         if time.Since(reqMtime) < blob_signature_ttl {
28                 log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blob_signature_ttl is %v! Skipping.",
29                         time.Since(reqMtime),
30                         trashRequest.Locator,
31                         trashRequest.BlockMtime,
32                         reqMtime,
33                         blob_signature_ttl)
34                 return
35         }
36
37         for _, volume := range KeepVM.AllWritable() {
38                 mtime, err := volume.Mtime(trashRequest.Locator)
39                 if err != nil {
40                         log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
41                         continue
42                 }
43                 if trashRequest.BlockMtime != mtime.Unix() {
44                         log.Printf("%v Delete(%v): mtime on volume is %v does not match trash list value %v", volume, trashRequest.Locator, mtime.Unix(), trashRequest.BlockMtime)
45                         continue
46                 }
47
48                 if never_delete {
49                         err = errors.New("did not delete block because never_delete is true")
50                 } else {
51                         err = volume.Delete(trashRequest.Locator)
52                 }
53
54                 if err != nil {
55                         log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
56                 } else {
57                         log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator)
58                 }
59         }
60 }