Merge branch '21189-changeset-limit'
[arvados.git] / services / keepstore / trash_worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "errors"
9         "time"
10
11         "git.arvados.org/arvados.git/sdk/go/arvados"
12         "github.com/sirupsen/logrus"
13 )
14
15 // RunTrashWorker processes the trash request queue.
16 func RunTrashWorker(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashq *WorkQueue) {
17         for item := range trashq.NextItem {
18                 trashRequest := item.(TrashRequest)
19                 TrashItem(volmgr, logger, cluster, trashRequest)
20                 trashq.DoneItem <- struct{}{}
21         }
22 }
23
24 // TrashItem deletes the indicated block from every writable volume.
25 func TrashItem(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashRequest TrashRequest) {
26         reqMtime := time.Unix(0, trashRequest.BlockMtime)
27         if time.Since(reqMtime) < cluster.Collections.BlobSigningTTL.Duration() {
28                 logger.Warnf("client asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
29                         arvados.Duration(time.Since(reqMtime)),
30                         trashRequest.Locator,
31                         trashRequest.BlockMtime,
32                         reqMtime,
33                         cluster.Collections.BlobSigningTTL)
34                 return
35         }
36
37         var volumes []*VolumeMount
38         if uuid := trashRequest.MountUUID; uuid == "" {
39                 volumes = volmgr.Mounts()
40         } else if mnt := volmgr.Lookup(uuid, false); mnt == nil {
41                 logger.Warnf("trash request for nonexistent mount: %v", trashRequest)
42                 return
43         } else if !mnt.KeepMount.AllowTrash {
44                 logger.Warnf("trash request for mount with ReadOnly=true, AllowTrashWhenReadOnly=false: %v", trashRequest)
45         } else {
46                 volumes = []*VolumeMount{mnt}
47         }
48
49         for _, volume := range volumes {
50                 mtime, err := volume.Mtime(trashRequest.Locator)
51                 if err != nil {
52                         logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
53                         continue
54                 }
55                 if trashRequest.BlockMtime != mtime.UnixNano() {
56                         logger.Infof("%v Trash(%v): stored mtime %v does not match trash list value %v; skipping", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
57                         continue
58                 }
59
60                 if !cluster.Collections.BlobTrash {
61                         err = errors.New("skipping because Collections.BlobTrash is false")
62                 } else {
63                         err = volume.Trash(trashRequest.Locator)
64                 }
65
66                 if err != nil {
67                         logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
68                 } else {
69                         logger.Infof("%v Trash(%v) OK", volume, trashRequest.Locator)
70                 }
71         }
72 }