Merge branch 'main' from workbench2.git
[arvados.git] / services / keepstore / trash_worker_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "container/list"
9         "context"
10         "time"
11
12         "git.arvados.org/arvados.git/sdk/go/ctxlog"
13         "github.com/prometheus/client_golang/prometheus"
14         check "gopkg.in/check.v1"
15 )
16
17 type TrashWorkerTestData struct {
18         Locator1    string
19         Block1      []byte
20         BlockMtime1 int64
21
22         Locator2    string
23         Block2      []byte
24         BlockMtime2 int64
25
26         CreateData      bool
27         CreateInVolume1 bool
28
29         UseTrashLifeTime bool
30         DifferentMtimes  bool
31
32         DeleteLocator    string
33         SpecifyMountUUID bool
34
35         ExpectLocator1 bool
36         ExpectLocator2 bool
37 }
38
39 // Delete block that does not exist in any of the keep volumes.
40 // Expect no errors.
41 func (s *HandlerSuite) TestTrashWorkerIntegration_GetNonExistingLocator(c *check.C) {
42         s.cluster.Collections.BlobTrash = true
43         testData := TrashWorkerTestData{
44                 Locator1: "5d41402abc4b2a76b9719d911017c592",
45                 Block1:   []byte("hello"),
46
47                 Locator2: "5d41402abc4b2a76b9719d911017c592",
48                 Block2:   []byte("hello"),
49
50                 CreateData: false,
51
52                 DeleteLocator: "5d41402abc4b2a76b9719d911017c592",
53
54                 ExpectLocator1: false,
55                 ExpectLocator2: false,
56         }
57         s.performTrashWorkerTest(c, testData)
58 }
59
60 // Delete a block that exists on volume 1 of the keep servers. Expect
61 // the second locator in volume 2 to be unaffected.
62 func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInVolume1(c *check.C) {
63         s.cluster.Collections.BlobTrash = true
64         testData := TrashWorkerTestData{
65                 Locator1: TestHash,
66                 Block1:   TestBlock,
67
68                 Locator2: TestHash2,
69                 Block2:   TestBlock2,
70
71                 CreateData: true,
72
73                 DeleteLocator: TestHash, // first locator
74
75                 ExpectLocator1: false,
76                 ExpectLocator2: true,
77         }
78         s.performTrashWorkerTest(c, testData)
79 }
80
81 // Delete a block that exists on volume 2 of the keep servers. Expect
82 // the first locator in volume 1 to be unaffected.
83 func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInVolume2(c *check.C) {
84         s.cluster.Collections.BlobTrash = true
85         testData := TrashWorkerTestData{
86                 Locator1: TestHash,
87                 Block1:   TestBlock,
88
89                 Locator2: TestHash2,
90                 Block2:   TestBlock2,
91
92                 CreateData: true,
93
94                 DeleteLocator: TestHash2, // locator 2
95
96                 ExpectLocator1: true,
97                 ExpectLocator2: false,
98         }
99         s.performTrashWorkerTest(c, testData)
100 }
101
102 // Delete a block with matching mtime for locator in both
103 // volumes. Expect locator to be deleted from both volumes.
104 func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInBothVolumes(c *check.C) {
105         s.cluster.Collections.BlobTrash = true
106         testData := TrashWorkerTestData{
107                 Locator1: TestHash,
108                 Block1:   TestBlock,
109
110                 Locator2: TestHash,
111                 Block2:   TestBlock,
112
113                 CreateData: true,
114
115                 DeleteLocator: TestHash,
116
117                 ExpectLocator1: false,
118                 ExpectLocator2: false,
119         }
120         s.performTrashWorkerTest(c, testData)
121 }
122
123 // Same locator with different Mtimes exists in both volumes. Delete
124 // the second and expect the first to be still around.
125 func (s *HandlerSuite) TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(c *check.C) {
126         s.cluster.Collections.BlobTrash = true
127         testData := TrashWorkerTestData{
128                 Locator1: TestHash,
129                 Block1:   TestBlock,
130
131                 Locator2: TestHash,
132                 Block2:   TestBlock,
133
134                 CreateData:      true,
135                 DifferentMtimes: true,
136
137                 DeleteLocator: TestHash,
138
139                 ExpectLocator1: true,
140                 ExpectLocator2: false,
141         }
142         s.performTrashWorkerTest(c, testData)
143 }
144
145 // Delete a block that exists on both volumes with matching mtimes,
146 // but specify a MountUUID in the request so it only gets deleted from
147 // the first volume.
148 func (s *HandlerSuite) TestTrashWorkerIntegration_SpecifyMountUUID(c *check.C) {
149         s.cluster.Collections.BlobTrash = true
150         testData := TrashWorkerTestData{
151                 Locator1: TestHash,
152                 Block1:   TestBlock,
153
154                 Locator2: TestHash,
155                 Block2:   TestBlock,
156
157                 CreateData: true,
158
159                 DeleteLocator:    TestHash,
160                 SpecifyMountUUID: true,
161
162                 ExpectLocator1: true,
163                 ExpectLocator2: true,
164         }
165         s.performTrashWorkerTest(c, testData)
166 }
167
168 // Two different locators in volume 1. Delete one of them. Expect the
169 // other unaffected.
170 func (s *HandlerSuite) TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(c *check.C) {
171         s.cluster.Collections.BlobTrash = true
172         testData := TrashWorkerTestData{
173                 Locator1: TestHash,
174                 Block1:   TestBlock,
175
176                 Locator2: TestHash2,
177                 Block2:   TestBlock2,
178
179                 CreateData:      true,
180                 CreateInVolume1: true,
181
182                 DeleteLocator: TestHash, // locator 1
183
184                 ExpectLocator1: false,
185                 ExpectLocator2: true,
186         }
187         s.performTrashWorkerTest(c, testData)
188 }
189
190 // Allow default Trash Life time to be used. Thus, the newly created
191 // block will not be deleted because its Mtime is within the trash
192 // life time.
193 func (s *HandlerSuite) TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(c *check.C) {
194         s.cluster.Collections.BlobTrash = true
195         testData := TrashWorkerTestData{
196                 Locator1: TestHash,
197                 Block1:   TestBlock,
198
199                 Locator2: TestHash2,
200                 Block2:   TestBlock2,
201
202                 CreateData:      true,
203                 CreateInVolume1: true,
204
205                 UseTrashLifeTime: true,
206
207                 DeleteLocator: TestHash, // locator 1
208
209                 // Since trash life time is in effect, block won't be deleted.
210                 ExpectLocator1: true,
211                 ExpectLocator2: true,
212         }
213         s.performTrashWorkerTest(c, testData)
214 }
215
216 // Delete a block with matching mtime for locator in both volumes, but
217 // EnableDelete is false, so block won't be deleted.
218 func (s *HandlerSuite) TestTrashWorkerIntegration_DisabledDelete(c *check.C) {
219         s.cluster.Collections.BlobTrash = false
220         testData := TrashWorkerTestData{
221                 Locator1: TestHash,
222                 Block1:   TestBlock,
223
224                 Locator2: TestHash,
225                 Block2:   TestBlock,
226
227                 CreateData: true,
228
229                 DeleteLocator: TestHash,
230
231                 ExpectLocator1: true,
232                 ExpectLocator2: true,
233         }
234         s.performTrashWorkerTest(c, testData)
235 }
236
237 func (s *HandlerSuite) performTrashWorkerTest(c *check.C, testData TrashWorkerTestData) {
238         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
239         // Replace the router's trashq -- which the worker goroutines
240         // started by setup() are now receiving from -- with a new
241         // one, so we can see what the handler sends to it.
242         trashq := NewWorkQueue()
243         s.handler.Handler.(*router).trashq = trashq
244
245         // Put test content
246         mounts := s.handler.volmgr.AllWritable()
247         if testData.CreateData {
248                 mounts[0].Put(context.Background(), testData.Locator1, testData.Block1)
249                 mounts[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata"))
250
251                 if testData.CreateInVolume1 {
252                         mounts[0].Put(context.Background(), testData.Locator2, testData.Block2)
253                         mounts[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
254                 } else {
255                         mounts[1].Put(context.Background(), testData.Locator2, testData.Block2)
256                         mounts[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
257                 }
258         }
259
260         oldBlockTime := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Minute)
261
262         // Create TrashRequest for the test
263         trashRequest := TrashRequest{
264                 Locator:    testData.DeleteLocator,
265                 BlockMtime: oldBlockTime.UnixNano(),
266         }
267         if testData.SpecifyMountUUID {
268                 trashRequest.MountUUID = s.handler.volmgr.Mounts()[0].UUID
269         }
270
271         // Run trash worker and put the trashRequest on trashq
272         trashList := list.New()
273         trashList.PushBack(trashRequest)
274
275         if !testData.UseTrashLifeTime {
276                 // Trash worker would not delete block if its Mtime is
277                 // within trash life time. Back-date the block to
278                 // allow the deletion to succeed.
279                 for _, mnt := range mounts {
280                         mnt.Volume.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime
281                         if testData.DifferentMtimes {
282                                 oldBlockTime = oldBlockTime.Add(time.Second)
283                         }
284                 }
285         }
286         go RunTrashWorker(s.handler.volmgr, ctxlog.TestLogger(c), s.cluster, trashq)
287
288         // Install gate so all local operations block until we say go
289         gate := make(chan struct{})
290         for _, mnt := range mounts {
291                 mnt.Volume.(*MockVolume).Gate = gate
292         }
293
294         assertStatusItem := func(k string, expect float64) {
295                 if v := getStatusItem(s.handler, "TrashQueue", k); v != expect {
296                         c.Errorf("Got %s %v, expected %v", k, v, expect)
297                 }
298         }
299
300         assertStatusItem("InProgress", 0)
301         assertStatusItem("Queued", 0)
302
303         listLen := trashList.Len()
304         trashq.ReplaceQueue(trashList)
305
306         // Wait for worker to take request(s)
307         expectEqualWithin(c, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
308
309         // Ensure status.json also reports work is happening
310         assertStatusItem("InProgress", float64(1))
311         assertStatusItem("Queued", float64(listLen-1))
312
313         // Let worker proceed
314         close(gate)
315
316         // Wait for worker to finish
317         expectEqualWithin(c, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
318
319         // Verify Locator1 to be un/deleted as expected
320         buf := make([]byte, BlockSize)
321         size, err := GetBlock(context.Background(), s.handler.volmgr, testData.Locator1, buf, nil)
322         if testData.ExpectLocator1 {
323                 if size == 0 || err != nil {
324                         c.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
325                 }
326         } else {
327                 if size > 0 || err == nil {
328                         c.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
329                 }
330         }
331
332         // Verify Locator2 to be un/deleted as expected
333         if testData.Locator1 != testData.Locator2 {
334                 size, err = GetBlock(context.Background(), s.handler.volmgr, testData.Locator2, buf, nil)
335                 if testData.ExpectLocator2 {
336                         if size == 0 || err != nil {
337                                 c.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
338                         }
339                 } else {
340                         if size > 0 || err == nil {
341                                 c.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
342                         }
343                 }
344         }
345
346         // The DifferentMtimes test puts the same locator in two
347         // different volumes, but only one copy has an Mtime matching
348         // the trash request.
349         if testData.DifferentMtimes {
350                 locatorFoundIn := 0
351                 for _, volume := range s.handler.volmgr.AllReadable() {
352                         buf := make([]byte, BlockSize)
353                         if _, err := volume.Get(context.Background(), testData.Locator1, buf); err == nil {
354                                 locatorFoundIn = locatorFoundIn + 1
355                         }
356                 }
357                 c.Check(locatorFoundIn, check.Equals, 1)
358         }
359 }