Merge branch '13647-keepstore-config'
[arvados.git] / services / keepstore / trash_worker_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "time"
11
12         "github.com/prometheus/client_golang/prometheus"
13         check "gopkg.in/check.v1"
14 )
15
16 type TrashWorkerTestData struct {
17         Locator1    string
18         Block1      []byte
19         BlockMtime1 int64
20
21         Locator2    string
22         Block2      []byte
23         BlockMtime2 int64
24
25         CreateData      bool
26         CreateInVolume1 bool
27
28         UseTrashLifeTime bool
29         DifferentMtimes  bool
30
31         DeleteLocator    string
32         SpecifyMountUUID bool
33
34         ExpectLocator1 bool
35         ExpectLocator2 bool
36 }
37
38 /* Delete block that does not exist in any of the keep volumes.
39    Expect no errors.
40 */
41 func (s *HandlerSuite) TestTrashWorkerIntegration_GetNonExistingLocator(c *check.C) {
42         s.cluster.Collections.BlobTrash = true
43         testData := TrashWorkerTestData{
44                 Locator1: "5d41402abc4b2a76b9719d911017c592",
45                 Block1:   []byte("hello"),
46
47                 Locator2: "5d41402abc4b2a76b9719d911017c592",
48                 Block2:   []byte("hello"),
49
50                 CreateData: false,
51
52                 DeleteLocator: "5d41402abc4b2a76b9719d911017c592",
53
54                 ExpectLocator1: false,
55                 ExpectLocator2: false,
56         }
57         s.performTrashWorkerTest(c, testData)
58 }
59
60 /* Delete a block that exists on volume 1 of the keep servers.
61    Expect the second locator in volume 2 to be unaffected.
62 */
63 func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInVolume1(c *check.C) {
64         s.cluster.Collections.BlobTrash = true
65         testData := TrashWorkerTestData{
66                 Locator1: TestHash,
67                 Block1:   TestBlock,
68
69                 Locator2: TestHash2,
70                 Block2:   TestBlock2,
71
72                 CreateData: true,
73
74                 DeleteLocator: TestHash, // first locator
75
76                 ExpectLocator1: false,
77                 ExpectLocator2: true,
78         }
79         s.performTrashWorkerTest(c, testData)
80 }
81
82 /* Delete a block that exists on volume 2 of the keep servers.
83    Expect the first locator in volume 1 to be unaffected.
84 */
85 func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInVolume2(c *check.C) {
86         s.cluster.Collections.BlobTrash = true
87         testData := TrashWorkerTestData{
88                 Locator1: TestHash,
89                 Block1:   TestBlock,
90
91                 Locator2: TestHash2,
92                 Block2:   TestBlock2,
93
94                 CreateData: true,
95
96                 DeleteLocator: TestHash2, // locator 2
97
98                 ExpectLocator1: true,
99                 ExpectLocator2: false,
100         }
101         s.performTrashWorkerTest(c, testData)
102 }
103
104 /* Delete a block with matching mtime for locator in both volumes.
105    Expect locator to be deleted from both volumes.
106 */
107 func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInBothVolumes(c *check.C) {
108         s.cluster.Collections.BlobTrash = true
109         testData := TrashWorkerTestData{
110                 Locator1: TestHash,
111                 Block1:   TestBlock,
112
113                 Locator2: TestHash,
114                 Block2:   TestBlock,
115
116                 CreateData: true,
117
118                 DeleteLocator: TestHash,
119
120                 ExpectLocator1: false,
121                 ExpectLocator2: false,
122         }
123         s.performTrashWorkerTest(c, testData)
124 }
125
126 /* Same locator with different Mtimes exists in both volumes.
127    Delete the second and expect the first to be still around.
128 */
129 func (s *HandlerSuite) TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(c *check.C) {
130         s.cluster.Collections.BlobTrash = true
131         testData := TrashWorkerTestData{
132                 Locator1: TestHash,
133                 Block1:   TestBlock,
134
135                 Locator2: TestHash,
136                 Block2:   TestBlock,
137
138                 CreateData:      true,
139                 DifferentMtimes: true,
140
141                 DeleteLocator: TestHash,
142
143                 ExpectLocator1: true,
144                 ExpectLocator2: false,
145         }
146         s.performTrashWorkerTest(c, testData)
147 }
148
149 // Delete a block that exists on both volumes with matching mtimes,
150 // but specify a MountUUID in the request so it only gets deleted from
151 // the first volume.
152 func (s *HandlerSuite) TestTrashWorkerIntegration_SpecifyMountUUID(c *check.C) {
153         s.cluster.Collections.BlobTrash = true
154         testData := TrashWorkerTestData{
155                 Locator1: TestHash,
156                 Block1:   TestBlock,
157
158                 Locator2: TestHash,
159                 Block2:   TestBlock,
160
161                 CreateData: true,
162
163                 DeleteLocator:    TestHash,
164                 SpecifyMountUUID: true,
165
166                 ExpectLocator1: true,
167                 ExpectLocator2: true,
168         }
169         s.performTrashWorkerTest(c, testData)
170 }
171
172 /* Two different locators in volume 1.
173    Delete one of them.
174    Expect the other unaffected.
175 */
176 func (s *HandlerSuite) TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(c *check.C) {
177         s.cluster.Collections.BlobTrash = true
178         testData := TrashWorkerTestData{
179                 Locator1: TestHash,
180                 Block1:   TestBlock,
181
182                 Locator2: TestHash2,
183                 Block2:   TestBlock2,
184
185                 CreateData:      true,
186                 CreateInVolume1: true,
187
188                 DeleteLocator: TestHash, // locator 1
189
190                 ExpectLocator1: false,
191                 ExpectLocator2: true,
192         }
193         s.performTrashWorkerTest(c, testData)
194 }
195
196 /* Allow default Trash Life time to be used. Thus, the newly created block
197    will not be deleted because its Mtime is within the trash life time.
198 */
199 func (s *HandlerSuite) TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(c *check.C) {
200         s.cluster.Collections.BlobTrash = true
201         testData := TrashWorkerTestData{
202                 Locator1: TestHash,
203                 Block1:   TestBlock,
204
205                 Locator2: TestHash2,
206                 Block2:   TestBlock2,
207
208                 CreateData:      true,
209                 CreateInVolume1: true,
210
211                 UseTrashLifeTime: true,
212
213                 DeleteLocator: TestHash, // locator 1
214
215                 // Since trash life time is in effect, block won't be deleted.
216                 ExpectLocator1: true,
217                 ExpectLocator2: true,
218         }
219         s.performTrashWorkerTest(c, testData)
220 }
221
222 /* Delete a block with matching mtime for locator in both volumes, but EnableDelete is false,
223    so block won't be deleted.
224 */
225 func (s *HandlerSuite) TestTrashWorkerIntegration_DisabledDelete(c *check.C) {
226         s.cluster.Collections.BlobTrash = false
227         testData := TrashWorkerTestData{
228                 Locator1: TestHash,
229                 Block1:   TestBlock,
230
231                 Locator2: TestHash,
232                 Block2:   TestBlock,
233
234                 CreateData: true,
235
236                 DeleteLocator: TestHash,
237
238                 ExpectLocator1: true,
239                 ExpectLocator2: true,
240         }
241         s.performTrashWorkerTest(c, testData)
242 }
243
244 /* Perform the test */
245 func (s *HandlerSuite) performTrashWorkerTest(c *check.C, testData TrashWorkerTestData) {
246         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
247         // Replace the router's trashq -- which the worker goroutines
248         // started by setup() are now receiving from -- with a new
249         // one, so we can see what the handler sends to it.
250         trashq := NewWorkQueue()
251         s.handler.Handler.(*router).trashq = trashq
252
253         // Put test content
254         mounts := s.handler.volmgr.AllWritable()
255         if testData.CreateData {
256                 mounts[0].Put(context.Background(), testData.Locator1, testData.Block1)
257                 mounts[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata"))
258
259                 if testData.CreateInVolume1 {
260                         mounts[0].Put(context.Background(), testData.Locator2, testData.Block2)
261                         mounts[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
262                 } else {
263                         mounts[1].Put(context.Background(), testData.Locator2, testData.Block2)
264                         mounts[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
265                 }
266         }
267
268         oldBlockTime := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Minute)
269
270         // Create TrashRequest for the test
271         trashRequest := TrashRequest{
272                 Locator:    testData.DeleteLocator,
273                 BlockMtime: oldBlockTime.UnixNano(),
274         }
275         if testData.SpecifyMountUUID {
276                 trashRequest.MountUUID = s.handler.volmgr.Mounts()[0].UUID
277         }
278
279         // Run trash worker and put the trashRequest on trashq
280         trashList := list.New()
281         trashList.PushBack(trashRequest)
282
283         if !testData.UseTrashLifeTime {
284                 // Trash worker would not delete block if its Mtime is
285                 // within trash life time. Back-date the block to
286                 // allow the deletion to succeed.
287                 for _, mnt := range mounts {
288                         mnt.Volume.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime
289                         if testData.DifferentMtimes {
290                                 oldBlockTime = oldBlockTime.Add(time.Second)
291                         }
292                 }
293         }
294         go RunTrashWorker(s.handler.volmgr, s.cluster, trashq)
295
296         // Install gate so all local operations block until we say go
297         gate := make(chan struct{})
298         for _, mnt := range mounts {
299                 mnt.Volume.(*MockVolume).Gate = gate
300         }
301
302         assertStatusItem := func(k string, expect float64) {
303                 if v := getStatusItem(s.handler, "TrashQueue", k); v != expect {
304                         c.Errorf("Got %s %v, expected %v", k, v, expect)
305                 }
306         }
307
308         assertStatusItem("InProgress", 0)
309         assertStatusItem("Queued", 0)
310
311         listLen := trashList.Len()
312         trashq.ReplaceQueue(trashList)
313
314         // Wait for worker to take request(s)
315         expectEqualWithin(c, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
316
317         // Ensure status.json also reports work is happening
318         assertStatusItem("InProgress", float64(1))
319         assertStatusItem("Queued", float64(listLen-1))
320
321         // Let worker proceed
322         close(gate)
323
324         // Wait for worker to finish
325         expectEqualWithin(c, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
326
327         // Verify Locator1 to be un/deleted as expected
328         buf := make([]byte, BlockSize)
329         size, err := GetBlock(context.Background(), s.handler.volmgr, testData.Locator1, buf, nil)
330         if testData.ExpectLocator1 {
331                 if size == 0 || err != nil {
332                         c.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
333                 }
334         } else {
335                 if size > 0 || err == nil {
336                         c.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
337                 }
338         }
339
340         // Verify Locator2 to be un/deleted as expected
341         if testData.Locator1 != testData.Locator2 {
342                 size, err = GetBlock(context.Background(), s.handler.volmgr, testData.Locator2, buf, nil)
343                 if testData.ExpectLocator2 {
344                         if size == 0 || err != nil {
345                                 c.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
346                         }
347                 } else {
348                         if size > 0 || err == nil {
349                                 c.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
350                         }
351                 }
352         }
353
354         // The DifferentMtimes test puts the same locator in two
355         // different volumes, but only one copy has an Mtime matching
356         // the trash request.
357         if testData.DifferentMtimes {
358                 locatorFoundIn := 0
359                 for _, volume := range s.handler.volmgr.AllReadable() {
360                         buf := make([]byte, BlockSize)
361                         if _, err := volume.Get(context.Background(), testData.Locator1, buf); err == nil {
362                                 locatorFoundIn = locatorFoundIn + 1
363                         }
364                 }
365                 c.Check(locatorFoundIn, check.Equals, 1)
366         }
367 }