Merge branch '9552-compute-checksum-final-output' refs #9552
[arvados.git] / services / keepstore / trash_worker_test.go
1 package main
2
3 import (
4         "container/list"
5         "testing"
6         "time"
7 )
8
9 type TrashWorkerTestData struct {
10         Locator1    string
11         Block1      []byte
12         BlockMtime1 int64
13
14         Locator2    string
15         Block2      []byte
16         BlockMtime2 int64
17
18         CreateData      bool
19         CreateInVolume1 bool
20
21         UseTrashLifeTime bool
22         DifferentMtimes  bool
23
24         DeleteLocator string
25
26         ExpectLocator1 bool
27         ExpectLocator2 bool
28 }
29
30 /* Delete block that does not exist in any of the keep volumes.
31    Expect no errors.
32 */
33 func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
34         neverDelete = false
35         testData := TrashWorkerTestData{
36                 Locator1: "5d41402abc4b2a76b9719d911017c592",
37                 Block1:   []byte("hello"),
38
39                 Locator2: "5d41402abc4b2a76b9719d911017c592",
40                 Block2:   []byte("hello"),
41
42                 CreateData: false,
43
44                 DeleteLocator: "5d41402abc4b2a76b9719d911017c592",
45
46                 ExpectLocator1: false,
47                 ExpectLocator2: false,
48         }
49         performTrashWorkerTest(testData, t)
50 }
51
52 /* Delete a block that exists on volume 1 of the keep servers.
53    Expect the second locator in volume 2 to be unaffected.
54 */
55 func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
56         neverDelete = false
57         testData := TrashWorkerTestData{
58                 Locator1: TestHash,
59                 Block1:   TestBlock,
60
61                 Locator2: TestHash2,
62                 Block2:   TestBlock2,
63
64                 CreateData: true,
65
66                 DeleteLocator: TestHash, // first locator
67
68                 ExpectLocator1: false,
69                 ExpectLocator2: true,
70         }
71         performTrashWorkerTest(testData, t)
72 }
73
74 /* Delete a block that exists on volume 2 of the keep servers.
75    Expect the first locator in volume 1 to be unaffected.
76 */
77 func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
78         neverDelete = false
79         testData := TrashWorkerTestData{
80                 Locator1: TestHash,
81                 Block1:   TestBlock,
82
83                 Locator2: TestHash2,
84                 Block2:   TestBlock2,
85
86                 CreateData: true,
87
88                 DeleteLocator: TestHash2, // locator 2
89
90                 ExpectLocator1: true,
91                 ExpectLocator2: false,
92         }
93         performTrashWorkerTest(testData, t)
94 }
95
96 /* Delete a block with matching mtime for locator in both volumes.
97    Expect locator to be deleted from both volumes.
98 */
99 func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
100         neverDelete = false
101         testData := TrashWorkerTestData{
102                 Locator1: TestHash,
103                 Block1:   TestBlock,
104
105                 Locator2: TestHash,
106                 Block2:   TestBlock,
107
108                 CreateData: true,
109
110                 DeleteLocator: TestHash,
111
112                 ExpectLocator1: false,
113                 ExpectLocator2: false,
114         }
115         performTrashWorkerTest(testData, t)
116 }
117
118 /* Same locator with different Mtimes exists in both volumes.
119    Delete the second and expect the first to be still around.
120 */
121 func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
122         neverDelete = false
123         testData := TrashWorkerTestData{
124                 Locator1: TestHash,
125                 Block1:   TestBlock,
126
127                 Locator2: TestHash,
128                 Block2:   TestBlock,
129
130                 CreateData:      true,
131                 DifferentMtimes: true,
132
133                 DeleteLocator: TestHash,
134
135                 ExpectLocator1: true,
136                 ExpectLocator2: false,
137         }
138         performTrashWorkerTest(testData, t)
139 }
140
141 /* Two different locators in volume 1.
142    Delete one of them.
143    Expect the other unaffected.
144 */
145 func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
146         neverDelete = false
147         testData := TrashWorkerTestData{
148                 Locator1: TestHash,
149                 Block1:   TestBlock,
150
151                 Locator2: TestHash2,
152                 Block2:   TestBlock2,
153
154                 CreateData:      true,
155                 CreateInVolume1: true,
156
157                 DeleteLocator: TestHash, // locator 1
158
159                 ExpectLocator1: false,
160                 ExpectLocator2: true,
161         }
162         performTrashWorkerTest(testData, t)
163 }
164
165 /* Allow default Trash Life time to be used. Thus, the newly created block
166    will not be deleted because its Mtime is within the trash life time.
167 */
168 func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
169         neverDelete = false
170         testData := TrashWorkerTestData{
171                 Locator1: TestHash,
172                 Block1:   TestBlock,
173
174                 Locator2: TestHash2,
175                 Block2:   TestBlock2,
176
177                 CreateData:      true,
178                 CreateInVolume1: true,
179
180                 UseTrashLifeTime: true,
181
182                 DeleteLocator: TestHash, // locator 1
183
184                 // Since trash life time is in effect, block won't be deleted.
185                 ExpectLocator1: true,
186                 ExpectLocator2: true,
187         }
188         performTrashWorkerTest(testData, t)
189 }
190
191 /* Delete a block with matching mtime for locator in both volumes, but neverDelete is true,
192    so block won't be deleted.
193 */
194 func TestTrashWorkerIntegration_NeverDelete(t *testing.T) {
195         neverDelete = true
196         testData := TrashWorkerTestData{
197                 Locator1: TestHash,
198                 Block1:   TestBlock,
199
200                 Locator2: TestHash,
201                 Block2:   TestBlock,
202
203                 CreateData: true,
204
205                 DeleteLocator: TestHash,
206
207                 ExpectLocator1: true,
208                 ExpectLocator2: true,
209         }
210         performTrashWorkerTest(testData, t)
211 }
212
213 /* Perform the test */
214 func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
215         // Create Keep Volumes
216         KeepVM = MakeTestVolumeManager(2)
217         defer KeepVM.Close()
218
219         // Put test content
220         vols := KeepVM.AllWritable()
221         if testData.CreateData {
222                 vols[0].Put(testData.Locator1, testData.Block1)
223                 vols[0].Put(testData.Locator1+".meta", []byte("metadata"))
224
225                 if testData.CreateInVolume1 {
226                         vols[0].Put(testData.Locator2, testData.Block2)
227                         vols[0].Put(testData.Locator2+".meta", []byte("metadata"))
228                 } else {
229                         vols[1].Put(testData.Locator2, testData.Block2)
230                         vols[1].Put(testData.Locator2+".meta", []byte("metadata"))
231                 }
232         }
233
234         oldBlockTime := time.Now().Add(-blobSignatureTTL - time.Minute)
235
236         // Create TrashRequest for the test
237         trashRequest := TrashRequest{
238                 Locator:    testData.DeleteLocator,
239                 BlockMtime: oldBlockTime.UnixNano(),
240         }
241
242         // Run trash worker and put the trashRequest on trashq
243         trashList := list.New()
244         trashList.PushBack(trashRequest)
245         trashq = NewWorkQueue()
246         defer trashq.Close()
247
248         if !testData.UseTrashLifeTime {
249                 // Trash worker would not delete block if its Mtime is
250                 // within trash life time. Back-date the block to
251                 // allow the deletion to succeed.
252                 for _, v := range vols {
253                         v.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime
254                         if testData.DifferentMtimes {
255                                 oldBlockTime = oldBlockTime.Add(time.Second)
256                         }
257                 }
258         }
259         go RunTrashWorker(trashq)
260
261         // Install gate so all local operations block until we say go
262         gate := make(chan struct{})
263         for _, v := range vols {
264                 v.(*MockVolume).Gate = gate
265         }
266
267         assertStatusItem := func(k string, expect float64) {
268                 if v := getStatusItem("TrashQueue", k); v != expect {
269                         t.Errorf("Got %s %v, expected %v", k, v, expect)
270                 }
271         }
272
273         assertStatusItem("InProgress", 0)
274         assertStatusItem("Queued", 0)
275
276         listLen := trashList.Len()
277         trashq.ReplaceQueue(trashList)
278
279         // Wait for worker to take request(s)
280         expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
281
282         // Ensure status.json also reports work is happening
283         assertStatusItem("InProgress", float64(1))
284         assertStatusItem("Queued", float64(listLen-1))
285
286         // Let worker proceed
287         close(gate)
288
289         // Wait for worker to finish
290         expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
291
292         // Verify Locator1 to be un/deleted as expected
293         buf := make([]byte, BlockSize)
294         size, err := GetBlock(testData.Locator1, buf, nil)
295         if testData.ExpectLocator1 {
296                 if size == 0 || err != nil {
297                         t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
298                 }
299         } else {
300                 if size > 0 || err == nil {
301                         t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
302                 }
303         }
304
305         // Verify Locator2 to be un/deleted as expected
306         if testData.Locator1 != testData.Locator2 {
307                 size, err = GetBlock(testData.Locator2, buf, nil)
308                 if testData.ExpectLocator2 {
309                         if size == 0 || err != nil {
310                                 t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
311                         }
312                 } else {
313                         if size > 0 || err == nil {
314                                 t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
315                         }
316                 }
317         }
318
319         // The DifferentMtimes test puts the same locator in two
320         // different volumes, but only one copy has an Mtime matching
321         // the trash request.
322         if testData.DifferentMtimes {
323                 locatorFoundIn := 0
324                 for _, volume := range KeepVM.AllReadable() {
325                         buf := make([]byte, BlockSize)
326                         if _, err := volume.Get(testData.Locator1, buf); err == nil {
327                                 locatorFoundIn = locatorFoundIn + 1
328                         }
329                 }
330                 if locatorFoundIn != 1 {
331                         t.Errorf("Found %d copies of %s, expected 1", locatorFoundIn, testData.Locator1)
332                 }
333         }
334 }