Merge branch '11590-log-reuse'
[arvados.git] / services / keepstore / trash_worker_test.go
1 package main
2
3 import (
4         "container/list"
5         "context"
6         "testing"
7         "time"
8 )
9
10 type TrashWorkerTestData struct {
11         Locator1    string
12         Block1      []byte
13         BlockMtime1 int64
14
15         Locator2    string
16         Block2      []byte
17         BlockMtime2 int64
18
19         CreateData      bool
20         CreateInVolume1 bool
21
22         UseTrashLifeTime bool
23         DifferentMtimes  bool
24
25         DeleteLocator    string
26         SpecifyMountUUID bool
27
28         ExpectLocator1 bool
29         ExpectLocator2 bool
30 }
31
32 /* Delete block that does not exist in any of the keep volumes.
33    Expect no errors.
34 */
35 func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
36         theConfig.EnableDelete = true
37         testData := TrashWorkerTestData{
38                 Locator1: "5d41402abc4b2a76b9719d911017c592",
39                 Block1:   []byte("hello"),
40
41                 Locator2: "5d41402abc4b2a76b9719d911017c592",
42                 Block2:   []byte("hello"),
43
44                 CreateData: false,
45
46                 DeleteLocator: "5d41402abc4b2a76b9719d911017c592",
47
48                 ExpectLocator1: false,
49                 ExpectLocator2: false,
50         }
51         performTrashWorkerTest(testData, t)
52 }
53
54 /* Delete a block that exists on volume 1 of the keep servers.
55    Expect the second locator in volume 2 to be unaffected.
56 */
57 func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
58         theConfig.EnableDelete = true
59         testData := TrashWorkerTestData{
60                 Locator1: TestHash,
61                 Block1:   TestBlock,
62
63                 Locator2: TestHash2,
64                 Block2:   TestBlock2,
65
66                 CreateData: true,
67
68                 DeleteLocator: TestHash, // first locator
69
70                 ExpectLocator1: false,
71                 ExpectLocator2: true,
72         }
73         performTrashWorkerTest(testData, t)
74 }
75
76 /* Delete a block that exists on volume 2 of the keep servers.
77    Expect the first locator in volume 1 to be unaffected.
78 */
79 func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
80         theConfig.EnableDelete = true
81         testData := TrashWorkerTestData{
82                 Locator1: TestHash,
83                 Block1:   TestBlock,
84
85                 Locator2: TestHash2,
86                 Block2:   TestBlock2,
87
88                 CreateData: true,
89
90                 DeleteLocator: TestHash2, // locator 2
91
92                 ExpectLocator1: true,
93                 ExpectLocator2: false,
94         }
95         performTrashWorkerTest(testData, t)
96 }
97
98 /* Delete a block with matching mtime for locator in both volumes.
99    Expect locator to be deleted from both volumes.
100 */
101 func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
102         theConfig.EnableDelete = true
103         testData := TrashWorkerTestData{
104                 Locator1: TestHash,
105                 Block1:   TestBlock,
106
107                 Locator2: TestHash,
108                 Block2:   TestBlock,
109
110                 CreateData: true,
111
112                 DeleteLocator: TestHash,
113
114                 ExpectLocator1: false,
115                 ExpectLocator2: false,
116         }
117         performTrashWorkerTest(testData, t)
118 }
119
120 /* Same locator with different Mtimes exists in both volumes.
121    Delete the second and expect the first to be still around.
122 */
123 func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
124         theConfig.EnableDelete = true
125         testData := TrashWorkerTestData{
126                 Locator1: TestHash,
127                 Block1:   TestBlock,
128
129                 Locator2: TestHash,
130                 Block2:   TestBlock,
131
132                 CreateData:      true,
133                 DifferentMtimes: true,
134
135                 DeleteLocator: TestHash,
136
137                 ExpectLocator1: true,
138                 ExpectLocator2: false,
139         }
140         performTrashWorkerTest(testData, t)
141 }
142
143 // Delete a block that exists on both volumes with matching mtimes,
144 // but specify a MountUUID in the request so it only gets deleted from
145 // the first volume.
146 func TestTrashWorkerIntegration_SpecifyMountUUID(t *testing.T) {
147         theConfig.EnableDelete = true
148         testData := TrashWorkerTestData{
149                 Locator1: TestHash,
150                 Block1:   TestBlock,
151
152                 Locator2: TestHash,
153                 Block2:   TestBlock,
154
155                 CreateData: true,
156
157                 DeleteLocator:    TestHash,
158                 SpecifyMountUUID: true,
159
160                 ExpectLocator1: true,
161                 ExpectLocator2: true,
162         }
163         performTrashWorkerTest(testData, t)
164 }
165
166 /* Two different locators in volume 1.
167    Delete one of them.
168    Expect the other unaffected.
169 */
170 func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
171         theConfig.EnableDelete = true
172         testData := TrashWorkerTestData{
173                 Locator1: TestHash,
174                 Block1:   TestBlock,
175
176                 Locator2: TestHash2,
177                 Block2:   TestBlock2,
178
179                 CreateData:      true,
180                 CreateInVolume1: true,
181
182                 DeleteLocator: TestHash, // locator 1
183
184                 ExpectLocator1: false,
185                 ExpectLocator2: true,
186         }
187         performTrashWorkerTest(testData, t)
188 }
189
190 /* Allow default Trash Life time to be used. Thus, the newly created block
191    will not be deleted because its Mtime is within the trash life time.
192 */
193 func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
194         theConfig.EnableDelete = true
195         testData := TrashWorkerTestData{
196                 Locator1: TestHash,
197                 Block1:   TestBlock,
198
199                 Locator2: TestHash2,
200                 Block2:   TestBlock2,
201
202                 CreateData:      true,
203                 CreateInVolume1: true,
204
205                 UseTrashLifeTime: true,
206
207                 DeleteLocator: TestHash, // locator 1
208
209                 // Since trash life time is in effect, block won't be deleted.
210                 ExpectLocator1: true,
211                 ExpectLocator2: true,
212         }
213         performTrashWorkerTest(testData, t)
214 }
215
216 /* Delete a block with matching mtime for locator in both volumes, but EnableDelete is false,
217    so block won't be deleted.
218 */
219 func TestTrashWorkerIntegration_DisabledDelete(t *testing.T) {
220         theConfig.EnableDelete = false
221         testData := TrashWorkerTestData{
222                 Locator1: TestHash,
223                 Block1:   TestBlock,
224
225                 Locator2: TestHash,
226                 Block2:   TestBlock,
227
228                 CreateData: true,
229
230                 DeleteLocator: TestHash,
231
232                 ExpectLocator1: true,
233                 ExpectLocator2: true,
234         }
235         performTrashWorkerTest(testData, t)
236 }
237
238 /* Perform the test */
239 func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
240         // Create Keep Volumes
241         KeepVM = MakeTestVolumeManager(2)
242         defer KeepVM.Close()
243
244         // Put test content
245         vols := KeepVM.AllWritable()
246         if testData.CreateData {
247                 vols[0].Put(context.Background(), testData.Locator1, testData.Block1)
248                 vols[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata"))
249
250                 if testData.CreateInVolume1 {
251                         vols[0].Put(context.Background(), testData.Locator2, testData.Block2)
252                         vols[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
253                 } else {
254                         vols[1].Put(context.Background(), testData.Locator2, testData.Block2)
255                         vols[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
256                 }
257         }
258
259         oldBlockTime := time.Now().Add(-theConfig.BlobSignatureTTL.Duration() - time.Minute)
260
261         // Create TrashRequest for the test
262         trashRequest := TrashRequest{
263                 Locator:    testData.DeleteLocator,
264                 BlockMtime: oldBlockTime.UnixNano(),
265         }
266         if testData.SpecifyMountUUID {
267                 trashRequest.MountUUID = KeepVM.Mounts()[0].UUID
268         }
269
270         // Run trash worker and put the trashRequest on trashq
271         trashList := list.New()
272         trashList.PushBack(trashRequest)
273         trashq = NewWorkQueue()
274         defer trashq.Close()
275
276         if !testData.UseTrashLifeTime {
277                 // Trash worker would not delete block if its Mtime is
278                 // within trash life time. Back-date the block to
279                 // allow the deletion to succeed.
280                 for _, v := range vols {
281                         v.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime
282                         if testData.DifferentMtimes {
283                                 oldBlockTime = oldBlockTime.Add(time.Second)
284                         }
285                 }
286         }
287         go RunTrashWorker(trashq)
288
289         // Install gate so all local operations block until we say go
290         gate := make(chan struct{})
291         for _, v := range vols {
292                 v.(*MockVolume).Gate = gate
293         }
294
295         assertStatusItem := func(k string, expect float64) {
296                 if v := getStatusItem("TrashQueue", k); v != expect {
297                         t.Errorf("Got %s %v, expected %v", k, v, expect)
298                 }
299         }
300
301         assertStatusItem("InProgress", 0)
302         assertStatusItem("Queued", 0)
303
304         listLen := trashList.Len()
305         trashq.ReplaceQueue(trashList)
306
307         // Wait for worker to take request(s)
308         expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
309
310         // Ensure status.json also reports work is happening
311         assertStatusItem("InProgress", float64(1))
312         assertStatusItem("Queued", float64(listLen-1))
313
314         // Let worker proceed
315         close(gate)
316
317         // Wait for worker to finish
318         expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
319
320         // Verify Locator1 to be un/deleted as expected
321         buf := make([]byte, BlockSize)
322         size, err := GetBlock(context.Background(), testData.Locator1, buf, nil)
323         if testData.ExpectLocator1 {
324                 if size == 0 || err != nil {
325                         t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
326                 }
327         } else {
328                 if size > 0 || err == nil {
329                         t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
330                 }
331         }
332
333         // Verify Locator2 to be un/deleted as expected
334         if testData.Locator1 != testData.Locator2 {
335                 size, err = GetBlock(context.Background(), testData.Locator2, buf, nil)
336                 if testData.ExpectLocator2 {
337                         if size == 0 || err != nil {
338                                 t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
339                         }
340                 } else {
341                         if size > 0 || err == nil {
342                                 t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
343                         }
344                 }
345         }
346
347         // The DifferentMtimes test puts the same locator in two
348         // different volumes, but only one copy has an Mtime matching
349         // the trash request.
350         if testData.DifferentMtimes {
351                 locatorFoundIn := 0
352                 for _, volume := range KeepVM.AllReadable() {
353                         buf := make([]byte, BlockSize)
354                         if _, err := volume.Get(context.Background(), testData.Locator1, buf); err == nil {
355                                 locatorFoundIn = locatorFoundIn + 1
356                         }
357                 }
358                 if locatorFoundIn != 1 {
359                         t.Errorf("Found %d copies of %s, expected 1", locatorFoundIn, testData.Locator1)
360                 }
361         }
362 }