Merge branch '10467-client-disconnect' refs #10467
[arvados.git] / services / keepstore / trash_worker_test.go
1 package main
2
3 import (
4         "container/list"
5         "context"
6         "testing"
7         "time"
8 )
9
10 type TrashWorkerTestData struct {
11         Locator1    string
12         Block1      []byte
13         BlockMtime1 int64
14
15         Locator2    string
16         Block2      []byte
17         BlockMtime2 int64
18
19         CreateData      bool
20         CreateInVolume1 bool
21
22         UseTrashLifeTime bool
23         DifferentMtimes  bool
24
25         DeleteLocator string
26
27         ExpectLocator1 bool
28         ExpectLocator2 bool
29 }
30
31 /* Delete block that does not exist in any of the keep volumes.
32    Expect no errors.
33 */
34 func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
35         theConfig.EnableDelete = true
36         testData := TrashWorkerTestData{
37                 Locator1: "5d41402abc4b2a76b9719d911017c592",
38                 Block1:   []byte("hello"),
39
40                 Locator2: "5d41402abc4b2a76b9719d911017c592",
41                 Block2:   []byte("hello"),
42
43                 CreateData: false,
44
45                 DeleteLocator: "5d41402abc4b2a76b9719d911017c592",
46
47                 ExpectLocator1: false,
48                 ExpectLocator2: false,
49         }
50         performTrashWorkerTest(testData, t)
51 }
52
53 /* Delete a block that exists on volume 1 of the keep servers.
54    Expect the second locator in volume 2 to be unaffected.
55 */
56 func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
57         theConfig.EnableDelete = true
58         testData := TrashWorkerTestData{
59                 Locator1: TestHash,
60                 Block1:   TestBlock,
61
62                 Locator2: TestHash2,
63                 Block2:   TestBlock2,
64
65                 CreateData: true,
66
67                 DeleteLocator: TestHash, // first locator
68
69                 ExpectLocator1: false,
70                 ExpectLocator2: true,
71         }
72         performTrashWorkerTest(testData, t)
73 }
74
75 /* Delete a block that exists on volume 2 of the keep servers.
76    Expect the first locator in volume 1 to be unaffected.
77 */
78 func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
79         theConfig.EnableDelete = true
80         testData := TrashWorkerTestData{
81                 Locator1: TestHash,
82                 Block1:   TestBlock,
83
84                 Locator2: TestHash2,
85                 Block2:   TestBlock2,
86
87                 CreateData: true,
88
89                 DeleteLocator: TestHash2, // locator 2
90
91                 ExpectLocator1: true,
92                 ExpectLocator2: false,
93         }
94         performTrashWorkerTest(testData, t)
95 }
96
97 /* Delete a block with matching mtime for locator in both volumes.
98    Expect locator to be deleted from both volumes.
99 */
100 func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
101         theConfig.EnableDelete = true
102         testData := TrashWorkerTestData{
103                 Locator1: TestHash,
104                 Block1:   TestBlock,
105
106                 Locator2: TestHash,
107                 Block2:   TestBlock,
108
109                 CreateData: true,
110
111                 DeleteLocator: TestHash,
112
113                 ExpectLocator1: false,
114                 ExpectLocator2: false,
115         }
116         performTrashWorkerTest(testData, t)
117 }
118
119 /* Same locator with different Mtimes exists in both volumes.
120    Delete the second and expect the first to be still around.
121 */
122 func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
123         theConfig.EnableDelete = true
124         testData := TrashWorkerTestData{
125                 Locator1: TestHash,
126                 Block1:   TestBlock,
127
128                 Locator2: TestHash,
129                 Block2:   TestBlock,
130
131                 CreateData:      true,
132                 DifferentMtimes: true,
133
134                 DeleteLocator: TestHash,
135
136                 ExpectLocator1: true,
137                 ExpectLocator2: false,
138         }
139         performTrashWorkerTest(testData, t)
140 }
141
142 /* Two different locators in volume 1.
143    Delete one of them.
144    Expect the other unaffected.
145 */
146 func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
147         theConfig.EnableDelete = true
148         testData := TrashWorkerTestData{
149                 Locator1: TestHash,
150                 Block1:   TestBlock,
151
152                 Locator2: TestHash2,
153                 Block2:   TestBlock2,
154
155                 CreateData:      true,
156                 CreateInVolume1: true,
157
158                 DeleteLocator: TestHash, // locator 1
159
160                 ExpectLocator1: false,
161                 ExpectLocator2: true,
162         }
163         performTrashWorkerTest(testData, t)
164 }
165
166 /* Allow default Trash Life time to be used. Thus, the newly created block
167    will not be deleted because its Mtime is within the trash life time.
168 */
169 func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
170         theConfig.EnableDelete = true
171         testData := TrashWorkerTestData{
172                 Locator1: TestHash,
173                 Block1:   TestBlock,
174
175                 Locator2: TestHash2,
176                 Block2:   TestBlock2,
177
178                 CreateData:      true,
179                 CreateInVolume1: true,
180
181                 UseTrashLifeTime: true,
182
183                 DeleteLocator: TestHash, // locator 1
184
185                 // Since trash life time is in effect, block won't be deleted.
186                 ExpectLocator1: true,
187                 ExpectLocator2: true,
188         }
189         performTrashWorkerTest(testData, t)
190 }
191
192 /* Delete a block with matching mtime for locator in both volumes, but EnableDelete is false,
193    so block won't be deleted.
194 */
195 func TestTrashWorkerIntegration_DisabledDelete(t *testing.T) {
196         theConfig.EnableDelete = false
197         testData := TrashWorkerTestData{
198                 Locator1: TestHash,
199                 Block1:   TestBlock,
200
201                 Locator2: TestHash,
202                 Block2:   TestBlock,
203
204                 CreateData: true,
205
206                 DeleteLocator: TestHash,
207
208                 ExpectLocator1: true,
209                 ExpectLocator2: true,
210         }
211         performTrashWorkerTest(testData, t)
212 }
213
214 /* Perform the test */
215 func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
216         // Create Keep Volumes
217         KeepVM = MakeTestVolumeManager(2)
218         defer KeepVM.Close()
219
220         // Put test content
221         vols := KeepVM.AllWritable()
222         if testData.CreateData {
223                 vols[0].Put(context.Background(), testData.Locator1, testData.Block1)
224                 vols[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata"))
225
226                 if testData.CreateInVolume1 {
227                         vols[0].Put(context.Background(), testData.Locator2, testData.Block2)
228                         vols[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
229                 } else {
230                         vols[1].Put(context.Background(), testData.Locator2, testData.Block2)
231                         vols[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
232                 }
233         }
234
235         oldBlockTime := time.Now().Add(-theConfig.BlobSignatureTTL.Duration() - time.Minute)
236
237         // Create TrashRequest for the test
238         trashRequest := TrashRequest{
239                 Locator:    testData.DeleteLocator,
240                 BlockMtime: oldBlockTime.UnixNano(),
241         }
242
243         // Run trash worker and put the trashRequest on trashq
244         trashList := list.New()
245         trashList.PushBack(trashRequest)
246         trashq = NewWorkQueue()
247         defer trashq.Close()
248
249         if !testData.UseTrashLifeTime {
250                 // Trash worker would not delete block if its Mtime is
251                 // within trash life time. Back-date the block to
252                 // allow the deletion to succeed.
253                 for _, v := range vols {
254                         v.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime
255                         if testData.DifferentMtimes {
256                                 oldBlockTime = oldBlockTime.Add(time.Second)
257                         }
258                 }
259         }
260         go RunTrashWorker(trashq)
261
262         // Install gate so all local operations block until we say go
263         gate := make(chan struct{})
264         for _, v := range vols {
265                 v.(*MockVolume).Gate = gate
266         }
267
268         assertStatusItem := func(k string, expect float64) {
269                 if v := getStatusItem("TrashQueue", k); v != expect {
270                         t.Errorf("Got %s %v, expected %v", k, v, expect)
271                 }
272         }
273
274         assertStatusItem("InProgress", 0)
275         assertStatusItem("Queued", 0)
276
277         listLen := trashList.Len()
278         trashq.ReplaceQueue(trashList)
279
280         // Wait for worker to take request(s)
281         expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
282
283         // Ensure status.json also reports work is happening
284         assertStatusItem("InProgress", float64(1))
285         assertStatusItem("Queued", float64(listLen-1))
286
287         // Let worker proceed
288         close(gate)
289
290         // Wait for worker to finish
291         expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
292
293         // Verify Locator1 to be un/deleted as expected
294         buf := make([]byte, BlockSize)
295         size, err := GetBlock(context.Background(), testData.Locator1, buf, nil)
296         if testData.ExpectLocator1 {
297                 if size == 0 || err != nil {
298                         t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
299                 }
300         } else {
301                 if size > 0 || err == nil {
302                         t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
303                 }
304         }
305
306         // Verify Locator2 to be un/deleted as expected
307         if testData.Locator1 != testData.Locator2 {
308                 size, err = GetBlock(context.Background(), testData.Locator2, buf, nil)
309                 if testData.ExpectLocator2 {
310                         if size == 0 || err != nil {
311                                 t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
312                         }
313                 } else {
314                         if size > 0 || err == nil {
315                                 t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
316                         }
317                 }
318         }
319
320         // The DifferentMtimes test puts the same locator in two
321         // different volumes, but only one copy has an Mtime matching
322         // the trash request.
323         if testData.DifferentMtimes {
324                 locatorFoundIn := 0
325                 for _, volume := range KeepVM.AllReadable() {
326                         buf := make([]byte, BlockSize)
327                         if _, err := volume.Get(context.Background(), testData.Locator1, buf); err == nil {
328                                 locatorFoundIn = locatorFoundIn + 1
329                         }
330                 }
331                 if locatorFoundIn != 1 {
332                         t.Errorf("Found %d copies of %s, expected 1", locatorFoundIn, testData.Locator1)
333                 }
334         }
335 }