Merge branch '6260-work-queue-status' refs #6260
[arvados.git] / services / keepstore / trash_worker.go
1 package main
2
3 import (
4         "errors"
5         "log"
6         "time"
7 )
8
9 /*
10         Keepstore initiates trash worker channel goroutine.
11         The channel will process trash list.
12                 For each (next) trash request:
13       Delete the block indicated by the trash request Locator
14                 Repeat
15 */
16
17 func RunTrashWorker(trashq *WorkQueue) {
18         for item := range trashq.NextItem {
19                 trashRequest := item.(TrashRequest)
20                 TrashItem(trashRequest)
21                 trashq.DoneItem <- struct{}{}
22         }
23 }
24
25 // TrashItem deletes the indicated block from every writable volume.
26 func TrashItem(trashRequest TrashRequest) {
27         reqMtime := time.Unix(trashRequest.BlockMtime, 0)
28         if time.Since(reqMtime) < blob_signature_ttl {
29                 log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blob_signature_ttl is %v! Skipping.",
30                         time.Since(reqMtime),
31                         trashRequest.Locator,
32                         trashRequest.BlockMtime,
33                         reqMtime,
34                         blob_signature_ttl)
35                 return
36         }
37
38         for _, volume := range KeepVM.AllWritable() {
39                 mtime, err := volume.Mtime(trashRequest.Locator)
40                 if err != nil {
41                         log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
42                         continue
43                 }
44                 if trashRequest.BlockMtime != mtime.Unix() {
45                         log.Printf("%v Delete(%v): mtime on volume is %v does not match trash list value %v", volume, trashRequest.Locator, mtime.Unix(), trashRequest.BlockMtime)
46                         continue
47                 }
48
49                 if never_delete {
50                         err = errors.New("did not delete block because never_delete is true")
51                 } else {
52                         err = volume.Delete(trashRequest.Locator)
53                 }
54
55                 if err != nil {
56                         log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
57                 } else {
58                         log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator)
59                 }
60         }
61 }