12715: Rename flag !reading => alwaysReadEOF.
[arvados.git] / services / keepstore / trash_worker_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "testing"
11         "time"
12 )
13
14 type TrashWorkerTestData struct {
15         Locator1    string
16         Block1      []byte
17         BlockMtime1 int64
18
19         Locator2    string
20         Block2      []byte
21         BlockMtime2 int64
22
23         CreateData      bool
24         CreateInVolume1 bool
25
26         UseTrashLifeTime bool
27         DifferentMtimes  bool
28
29         DeleteLocator    string
30         SpecifyMountUUID bool
31
32         ExpectLocator1 bool
33         ExpectLocator2 bool
34 }
35
36 /* Delete block that does not exist in any of the keep volumes.
37    Expect no errors.
38 */
39 func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
40         theConfig.EnableDelete = true
41         testData := TrashWorkerTestData{
42                 Locator1: "5d41402abc4b2a76b9719d911017c592",
43                 Block1:   []byte("hello"),
44
45                 Locator2: "5d41402abc4b2a76b9719d911017c592",
46                 Block2:   []byte("hello"),
47
48                 CreateData: false,
49
50                 DeleteLocator: "5d41402abc4b2a76b9719d911017c592",
51
52                 ExpectLocator1: false,
53                 ExpectLocator2: false,
54         }
55         performTrashWorkerTest(testData, t)
56 }
57
58 /* Delete a block that exists on volume 1 of the keep servers.
59    Expect the second locator in volume 2 to be unaffected.
60 */
61 func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
62         theConfig.EnableDelete = true
63         testData := TrashWorkerTestData{
64                 Locator1: TestHash,
65                 Block1:   TestBlock,
66
67                 Locator2: TestHash2,
68                 Block2:   TestBlock2,
69
70                 CreateData: true,
71
72                 DeleteLocator: TestHash, // first locator
73
74                 ExpectLocator1: false,
75                 ExpectLocator2: true,
76         }
77         performTrashWorkerTest(testData, t)
78 }
79
80 /* Delete a block that exists on volume 2 of the keep servers.
81    Expect the first locator in volume 1 to be unaffected.
82 */
83 func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
84         theConfig.EnableDelete = true
85         testData := TrashWorkerTestData{
86                 Locator1: TestHash,
87                 Block1:   TestBlock,
88
89                 Locator2: TestHash2,
90                 Block2:   TestBlock2,
91
92                 CreateData: true,
93
94                 DeleteLocator: TestHash2, // locator 2
95
96                 ExpectLocator1: true,
97                 ExpectLocator2: false,
98         }
99         performTrashWorkerTest(testData, t)
100 }
101
102 /* Delete a block with matching mtime for locator in both volumes.
103    Expect locator to be deleted from both volumes.
104 */
105 func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
106         theConfig.EnableDelete = true
107         testData := TrashWorkerTestData{
108                 Locator1: TestHash,
109                 Block1:   TestBlock,
110
111                 Locator2: TestHash,
112                 Block2:   TestBlock,
113
114                 CreateData: true,
115
116                 DeleteLocator: TestHash,
117
118                 ExpectLocator1: false,
119                 ExpectLocator2: false,
120         }
121         performTrashWorkerTest(testData, t)
122 }
123
124 /* Same locator with different Mtimes exists in both volumes.
125    Delete the second and expect the first to be still around.
126 */
127 func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
128         theConfig.EnableDelete = true
129         testData := TrashWorkerTestData{
130                 Locator1: TestHash,
131                 Block1:   TestBlock,
132
133                 Locator2: TestHash,
134                 Block2:   TestBlock,
135
136                 CreateData:      true,
137                 DifferentMtimes: true,
138
139                 DeleteLocator: TestHash,
140
141                 ExpectLocator1: true,
142                 ExpectLocator2: false,
143         }
144         performTrashWorkerTest(testData, t)
145 }
146
147 // Delete a block that exists on both volumes with matching mtimes,
148 // but specify a MountUUID in the request so it only gets deleted from
149 // the first volume.
150 func TestTrashWorkerIntegration_SpecifyMountUUID(t *testing.T) {
151         theConfig.EnableDelete = true
152         testData := TrashWorkerTestData{
153                 Locator1: TestHash,
154                 Block1:   TestBlock,
155
156                 Locator2: TestHash,
157                 Block2:   TestBlock,
158
159                 CreateData: true,
160
161                 DeleteLocator:    TestHash,
162                 SpecifyMountUUID: true,
163
164                 ExpectLocator1: true,
165                 ExpectLocator2: true,
166         }
167         performTrashWorkerTest(testData, t)
168 }
169
170 /* Two different locators in volume 1.
171    Delete one of them.
172    Expect the other unaffected.
173 */
174 func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
175         theConfig.EnableDelete = true
176         testData := TrashWorkerTestData{
177                 Locator1: TestHash,
178                 Block1:   TestBlock,
179
180                 Locator2: TestHash2,
181                 Block2:   TestBlock2,
182
183                 CreateData:      true,
184                 CreateInVolume1: true,
185
186                 DeleteLocator: TestHash, // locator 1
187
188                 ExpectLocator1: false,
189                 ExpectLocator2: true,
190         }
191         performTrashWorkerTest(testData, t)
192 }
193
194 /* Allow default Trash Life time to be used. Thus, the newly created block
195    will not be deleted because its Mtime is within the trash life time.
196 */
197 func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
198         theConfig.EnableDelete = true
199         testData := TrashWorkerTestData{
200                 Locator1: TestHash,
201                 Block1:   TestBlock,
202
203                 Locator2: TestHash2,
204                 Block2:   TestBlock2,
205
206                 CreateData:      true,
207                 CreateInVolume1: true,
208
209                 UseTrashLifeTime: true,
210
211                 DeleteLocator: TestHash, // locator 1
212
213                 // Since trash life time is in effect, block won't be deleted.
214                 ExpectLocator1: true,
215                 ExpectLocator2: true,
216         }
217         performTrashWorkerTest(testData, t)
218 }
219
220 /* Delete a block with matching mtime for locator in both volumes, but EnableDelete is false,
221    so block won't be deleted.
222 */
223 func TestTrashWorkerIntegration_DisabledDelete(t *testing.T) {
224         theConfig.EnableDelete = false
225         testData := TrashWorkerTestData{
226                 Locator1: TestHash,
227                 Block1:   TestBlock,
228
229                 Locator2: TestHash,
230                 Block2:   TestBlock,
231
232                 CreateData: true,
233
234                 DeleteLocator: TestHash,
235
236                 ExpectLocator1: true,
237                 ExpectLocator2: true,
238         }
239         performTrashWorkerTest(testData, t)
240 }
241
242 /* Perform the test */
243 func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
244         // Create Keep Volumes
245         KeepVM = MakeTestVolumeManager(2)
246         defer KeepVM.Close()
247
248         // Put test content
249         vols := KeepVM.AllWritable()
250         if testData.CreateData {
251                 vols[0].Put(context.Background(), testData.Locator1, testData.Block1)
252                 vols[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata"))
253
254                 if testData.CreateInVolume1 {
255                         vols[0].Put(context.Background(), testData.Locator2, testData.Block2)
256                         vols[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
257                 } else {
258                         vols[1].Put(context.Background(), testData.Locator2, testData.Block2)
259                         vols[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
260                 }
261         }
262
263         oldBlockTime := time.Now().Add(-theConfig.BlobSignatureTTL.Duration() - time.Minute)
264
265         // Create TrashRequest for the test
266         trashRequest := TrashRequest{
267                 Locator:    testData.DeleteLocator,
268                 BlockMtime: oldBlockTime.UnixNano(),
269         }
270         if testData.SpecifyMountUUID {
271                 trashRequest.MountUUID = KeepVM.Mounts()[0].UUID
272         }
273
274         // Run trash worker and put the trashRequest on trashq
275         trashList := list.New()
276         trashList.PushBack(trashRequest)
277         trashq = NewWorkQueue()
278         defer trashq.Close()
279
280         if !testData.UseTrashLifeTime {
281                 // Trash worker would not delete block if its Mtime is
282                 // within trash life time. Back-date the block to
283                 // allow the deletion to succeed.
284                 for _, v := range vols {
285                         v.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime
286                         if testData.DifferentMtimes {
287                                 oldBlockTime = oldBlockTime.Add(time.Second)
288                         }
289                 }
290         }
291         go RunTrashWorker(trashq)
292
293         // Install gate so all local operations block until we say go
294         gate := make(chan struct{})
295         for _, v := range vols {
296                 v.(*MockVolume).Gate = gate
297         }
298
299         assertStatusItem := func(k string, expect float64) {
300                 if v := getStatusItem("TrashQueue", k); v != expect {
301                         t.Errorf("Got %s %v, expected %v", k, v, expect)
302                 }
303         }
304
305         assertStatusItem("InProgress", 0)
306         assertStatusItem("Queued", 0)
307
308         listLen := trashList.Len()
309         trashq.ReplaceQueue(trashList)
310
311         // Wait for worker to take request(s)
312         expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
313
314         // Ensure status.json also reports work is happening
315         assertStatusItem("InProgress", float64(1))
316         assertStatusItem("Queued", float64(listLen-1))
317
318         // Let worker proceed
319         close(gate)
320
321         // Wait for worker to finish
322         expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
323
324         // Verify Locator1 to be un/deleted as expected
325         buf := make([]byte, BlockSize)
326         size, err := GetBlock(context.Background(), testData.Locator1, buf, nil)
327         if testData.ExpectLocator1 {
328                 if size == 0 || err != nil {
329                         t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
330                 }
331         } else {
332                 if size > 0 || err == nil {
333                         t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
334                 }
335         }
336
337         // Verify Locator2 to be un/deleted as expected
338         if testData.Locator1 != testData.Locator2 {
339                 size, err = GetBlock(context.Background(), testData.Locator2, buf, nil)
340                 if testData.ExpectLocator2 {
341                         if size == 0 || err != nil {
342                                 t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
343                         }
344                 } else {
345                         if size > 0 || err == nil {
346                                 t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
347                         }
348                 }
349         }
350
351         // The DifferentMtimes test puts the same locator in two
352         // different volumes, but only one copy has an Mtime matching
353         // the trash request.
354         if testData.DifferentMtimes {
355                 locatorFoundIn := 0
356                 for _, volume := range KeepVM.AllReadable() {
357                         buf := make([]byte, BlockSize)
358                         if _, err := volume.Get(context.Background(), testData.Locator1, buf); err == nil {
359                                 locatorFoundIn = locatorFoundIn + 1
360                         }
361                 }
362                 if locatorFoundIn != 1 {
363                         t.Errorf("Found %d copies of %s, expected 1", locatorFoundIn, testData.Locator1)
364                 }
365         }
366 }