8a9fedfb7007ca21ef1d5d2e482ce66464fafe1a
[arvados.git] / services / keepstore / trash_worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "errors"
9         "time"
10
11         "git.curoverse.com/arvados.git/sdk/go/arvados"
12 )
13
14 // RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
15 //      The channel will process trash list.
16 //              For each (next) trash request:
17 //      Delete the block indicated by the trash request Locator
18 //              Repeat
19 //
20 func RunTrashWorker(trashq *WorkQueue) {
21         for item := range trashq.NextItem {
22                 trashRequest := item.(TrashRequest)
23                 TrashItem(trashRequest)
24                 trashq.DoneItem <- struct{}{}
25         }
26 }
27
28 // TrashItem deletes the indicated block from every writable volume.
29 func TrashItem(trashRequest TrashRequest) {
30         reqMtime := time.Unix(0, trashRequest.BlockMtime)
31         if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() {
32                 log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
33                         arvados.Duration(time.Since(reqMtime)),
34                         trashRequest.Locator,
35                         trashRequest.BlockMtime,
36                         reqMtime,
37                         theConfig.BlobSignatureTTL)
38                 return
39         }
40
41         var volumes []Volume
42         if uuid := trashRequest.MountUUID; uuid == "" {
43                 volumes = KeepVM.AllWritable()
44         } else if v := KeepVM.Lookup(uuid, true); v == nil {
45                 log.Printf("warning: trash request for nonexistent mount: %v", trashRequest)
46                 return
47         } else {
48                 volumes = []Volume{v}
49         }
50
51         for _, volume := range volumes {
52                 mtime, err := volume.Mtime(trashRequest.Locator)
53                 if err != nil {
54                         log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
55                         continue
56                 }
57                 if trashRequest.BlockMtime != mtime.UnixNano() {
58                         log.Printf("%v Trash(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
59                         continue
60                 }
61
62                 if !theConfig.EnableDelete {
63                         err = errors.New("skipping because EnableDelete is false")
64                 } else {
65                         err = volume.Trash(trashRequest.Locator)
66                 }
67
68                 if err != nil {
69                         log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
70                 } else {
71                         log.Printf("%v Trash(%v) OK", volume, trashRequest.Locator)
72                 }
73         }
74 }