documentation: update descriptions for MaxKeepBlobBuffers and MaxConcurrentRequests
[arvados.git] / services / keepstore / trash_worker_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "time"
11
12         "git.arvados.org/arvados.git/sdk/go/ctxlog"
13         "github.com/prometheus/client_golang/prometheus"
14         check "gopkg.in/check.v1"
15 )
16
17 type TrashWorkerTestData struct {
18         Locator1    string
19         Block1      []byte
20         BlockMtime1 int64
21
22         Locator2    string
23         Block2      []byte
24         BlockMtime2 int64
25
26         CreateData      bool
27         CreateInVolume1 bool
28
29         UseTrashLifeTime bool
30         DifferentMtimes  bool
31
32         DeleteLocator    string
33         SpecifyMountUUID bool
34
35         ExpectLocator1 bool
36         ExpectLocator2 bool
37 }
38
39 /* Delete block that does not exist in any of the keep volumes.
40    Expect no errors.
41 */
42 func (s *HandlerSuite) TestTrashWorkerIntegration_GetNonExistingLocator(c *check.C) {
43         s.cluster.Collections.BlobTrash = true
44         testData := TrashWorkerTestData{
45                 Locator1: "5d41402abc4b2a76b9719d911017c592",
46                 Block1:   []byte("hello"),
47
48                 Locator2: "5d41402abc4b2a76b9719d911017c592",
49                 Block2:   []byte("hello"),
50
51                 CreateData: false,
52
53                 DeleteLocator: "5d41402abc4b2a76b9719d911017c592",
54
55                 ExpectLocator1: false,
56                 ExpectLocator2: false,
57         }
58         s.performTrashWorkerTest(c, testData)
59 }
60
61 /* Delete a block that exists on volume 1 of the keep servers.
62    Expect the second locator in volume 2 to be unaffected.
63 */
64 func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInVolume1(c *check.C) {
65         s.cluster.Collections.BlobTrash = true
66         testData := TrashWorkerTestData{
67                 Locator1: TestHash,
68                 Block1:   TestBlock,
69
70                 Locator2: TestHash2,
71                 Block2:   TestBlock2,
72
73                 CreateData: true,
74
75                 DeleteLocator: TestHash, // first locator
76
77                 ExpectLocator1: false,
78                 ExpectLocator2: true,
79         }
80         s.performTrashWorkerTest(c, testData)
81 }
82
83 /* Delete a block that exists on volume 2 of the keep servers.
84    Expect the first locator in volume 1 to be unaffected.
85 */
86 func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInVolume2(c *check.C) {
87         s.cluster.Collections.BlobTrash = true
88         testData := TrashWorkerTestData{
89                 Locator1: TestHash,
90                 Block1:   TestBlock,
91
92                 Locator2: TestHash2,
93                 Block2:   TestBlock2,
94
95                 CreateData: true,
96
97                 DeleteLocator: TestHash2, // locator 2
98
99                 ExpectLocator1: true,
100                 ExpectLocator2: false,
101         }
102         s.performTrashWorkerTest(c, testData)
103 }
104
105 /* Delete a block with matching mtime for locator in both volumes.
106    Expect locator to be deleted from both volumes.
107 */
108 func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInBothVolumes(c *check.C) {
109         s.cluster.Collections.BlobTrash = true
110         testData := TrashWorkerTestData{
111                 Locator1: TestHash,
112                 Block1:   TestBlock,
113
114                 Locator2: TestHash,
115                 Block2:   TestBlock,
116
117                 CreateData: true,
118
119                 DeleteLocator: TestHash,
120
121                 ExpectLocator1: false,
122                 ExpectLocator2: false,
123         }
124         s.performTrashWorkerTest(c, testData)
125 }
126
127 /* Same locator with different Mtimes exists in both volumes.
128    Delete the second and expect the first to be still around.
129 */
130 func (s *HandlerSuite) TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(c *check.C) {
131         s.cluster.Collections.BlobTrash = true
132         testData := TrashWorkerTestData{
133                 Locator1: TestHash,
134                 Block1:   TestBlock,
135
136                 Locator2: TestHash,
137                 Block2:   TestBlock,
138
139                 CreateData:      true,
140                 DifferentMtimes: true,
141
142                 DeleteLocator: TestHash,
143
144                 ExpectLocator1: true,
145                 ExpectLocator2: false,
146         }
147         s.performTrashWorkerTest(c, testData)
148 }
149
150 // Delete a block that exists on both volumes with matching mtimes,
151 // but specify a MountUUID in the request so it only gets deleted from
152 // the first volume.
153 func (s *HandlerSuite) TestTrashWorkerIntegration_SpecifyMountUUID(c *check.C) {
154         s.cluster.Collections.BlobTrash = true
155         testData := TrashWorkerTestData{
156                 Locator1: TestHash,
157                 Block1:   TestBlock,
158
159                 Locator2: TestHash,
160                 Block2:   TestBlock,
161
162                 CreateData: true,
163
164                 DeleteLocator:    TestHash,
165                 SpecifyMountUUID: true,
166
167                 ExpectLocator1: true,
168                 ExpectLocator2: true,
169         }
170         s.performTrashWorkerTest(c, testData)
171 }
172
173 /* Two different locators in volume 1.
174    Delete one of them.
175    Expect the other unaffected.
176 */
177 func (s *HandlerSuite) TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(c *check.C) {
178         s.cluster.Collections.BlobTrash = true
179         testData := TrashWorkerTestData{
180                 Locator1: TestHash,
181                 Block1:   TestBlock,
182
183                 Locator2: TestHash2,
184                 Block2:   TestBlock2,
185
186                 CreateData:      true,
187                 CreateInVolume1: true,
188
189                 DeleteLocator: TestHash, // locator 1
190
191                 ExpectLocator1: false,
192                 ExpectLocator2: true,
193         }
194         s.performTrashWorkerTest(c, testData)
195 }
196
197 /* Allow default Trash Life time to be used. Thus, the newly created block
198    will not be deleted because its Mtime is within the trash life time.
199 */
200 func (s *HandlerSuite) TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(c *check.C) {
201         s.cluster.Collections.BlobTrash = true
202         testData := TrashWorkerTestData{
203                 Locator1: TestHash,
204                 Block1:   TestBlock,
205
206                 Locator2: TestHash2,
207                 Block2:   TestBlock2,
208
209                 CreateData:      true,
210                 CreateInVolume1: true,
211
212                 UseTrashLifeTime: true,
213
214                 DeleteLocator: TestHash, // locator 1
215
216                 // Since trash life time is in effect, block won't be deleted.
217                 ExpectLocator1: true,
218                 ExpectLocator2: true,
219         }
220         s.performTrashWorkerTest(c, testData)
221 }
222
223 /* Delete a block with matching mtime for locator in both volumes, but EnableDelete is false,
224    so block won't be deleted.
225 */
226 func (s *HandlerSuite) TestTrashWorkerIntegration_DisabledDelete(c *check.C) {
227         s.cluster.Collections.BlobTrash = false
228         testData := TrashWorkerTestData{
229                 Locator1: TestHash,
230                 Block1:   TestBlock,
231
232                 Locator2: TestHash,
233                 Block2:   TestBlock,
234
235                 CreateData: true,
236
237                 DeleteLocator: TestHash,
238
239                 ExpectLocator1: true,
240                 ExpectLocator2: true,
241         }
242         s.performTrashWorkerTest(c, testData)
243 }
244
245 /* Perform the test */
246 func (s *HandlerSuite) performTrashWorkerTest(c *check.C, testData TrashWorkerTestData) {
247         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
248         // Replace the router's trashq -- which the worker goroutines
249         // started by setup() are now receiving from -- with a new
250         // one, so we can see what the handler sends to it.
251         trashq := NewWorkQueue()
252         s.handler.Handler.(*router).trashq = trashq
253
254         // Put test content
255         mounts := s.handler.volmgr.AllWritable()
256         if testData.CreateData {
257                 mounts[0].Put(context.Background(), testData.Locator1, testData.Block1)
258                 mounts[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata"))
259
260                 if testData.CreateInVolume1 {
261                         mounts[0].Put(context.Background(), testData.Locator2, testData.Block2)
262                         mounts[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
263                 } else {
264                         mounts[1].Put(context.Background(), testData.Locator2, testData.Block2)
265                         mounts[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
266                 }
267         }
268
269         oldBlockTime := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Minute)
270
271         // Create TrashRequest for the test
272         trashRequest := TrashRequest{
273                 Locator:    testData.DeleteLocator,
274                 BlockMtime: oldBlockTime.UnixNano(),
275         }
276         if testData.SpecifyMountUUID {
277                 trashRequest.MountUUID = s.handler.volmgr.Mounts()[0].UUID
278         }
279
280         // Run trash worker and put the trashRequest on trashq
281         trashList := list.New()
282         trashList.PushBack(trashRequest)
283
284         if !testData.UseTrashLifeTime {
285                 // Trash worker would not delete block if its Mtime is
286                 // within trash life time. Back-date the block to
287                 // allow the deletion to succeed.
288                 for _, mnt := range mounts {
289                         mnt.Volume.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime
290                         if testData.DifferentMtimes {
291                                 oldBlockTime = oldBlockTime.Add(time.Second)
292                         }
293                 }
294         }
295         go RunTrashWorker(s.handler.volmgr, ctxlog.TestLogger(c), s.cluster, trashq)
296
297         // Install gate so all local operations block until we say go
298         gate := make(chan struct{})
299         for _, mnt := range mounts {
300                 mnt.Volume.(*MockVolume).Gate = gate
301         }
302
303         assertStatusItem := func(k string, expect float64) {
304                 if v := getStatusItem(s.handler, "TrashQueue", k); v != expect {
305                         c.Errorf("Got %s %v, expected %v", k, v, expect)
306                 }
307         }
308
309         assertStatusItem("InProgress", 0)
310         assertStatusItem("Queued", 0)
311
312         listLen := trashList.Len()
313         trashq.ReplaceQueue(trashList)
314
315         // Wait for worker to take request(s)
316         expectEqualWithin(c, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
317
318         // Ensure status.json also reports work is happening
319         assertStatusItem("InProgress", float64(1))
320         assertStatusItem("Queued", float64(listLen-1))
321
322         // Let worker proceed
323         close(gate)
324
325         // Wait for worker to finish
326         expectEqualWithin(c, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
327
328         // Verify Locator1 to be un/deleted as expected
329         buf := make([]byte, BlockSize)
330         size, err := GetBlock(context.Background(), s.handler.volmgr, testData.Locator1, buf, nil)
331         if testData.ExpectLocator1 {
332                 if size == 0 || err != nil {
333                         c.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
334                 }
335         } else {
336                 if size > 0 || err == nil {
337                         c.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
338                 }
339         }
340
341         // Verify Locator2 to be un/deleted as expected
342         if testData.Locator1 != testData.Locator2 {
343                 size, err = GetBlock(context.Background(), s.handler.volmgr, testData.Locator2, buf, nil)
344                 if testData.ExpectLocator2 {
345                         if size == 0 || err != nil {
346                                 c.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
347                         }
348                 } else {
349                         if size > 0 || err == nil {
350                                 c.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
351                         }
352                 }
353         }
354
355         // The DifferentMtimes test puts the same locator in two
356         // different volumes, but only one copy has an Mtime matching
357         // the trash request.
358         if testData.DifferentMtimes {
359                 locatorFoundIn := 0
360                 for _, volume := range s.handler.volmgr.AllReadable() {
361                         buf := make([]byte, BlockSize)
362                         if _, err := volume.Get(context.Background(), testData.Locator1, buf); err == nil {
363                                 locatorFoundIn = locatorFoundIn + 1
364                         }
365                 }
366                 c.Check(locatorFoundIn, check.Equals, 1)
367         }
368 }