7241: Use new CreateBlockBlobFromReader and SetBlobMetadata APIs for Put and Touch.
[arvados.git] / services / keepstore / volume_generic_test.go
1 package main
2
3 import (
4         "bytes"
5         "os"
6         "regexp"
7         "sort"
8         "strings"
9         "testing"
10         "time"
11 )
12
13 // A TestableVolumeFactory returns a new TestableVolume. The factory
14 // function, and the TestableVolume it returns, can use "t" to write
15 // logs, fail the current test, etc.
16 type TestableVolumeFactory func(t *testing.T) TestableVolume
17
18 // DoGenericVolumeTests runs a set of tests that every TestableVolume
19 // is expected to pass. It calls factory to create a new TestableVolume
20 // for each test case, to avoid leaking state between tests.
21 func DoGenericVolumeTests(t *testing.T, factory TestableVolumeFactory) {
22         testGet(t, factory)
23         testGetNoSuchBlock(t, factory)
24
25         testCompareSameContent(t, factory)
26         testCompareWithDifferentContent(t, factory)
27         testCompareWithBadData(t, factory)
28
29         testPutBlockWithSameContent(t, factory)
30         testPutBlockWithDifferentContent(t, factory)
31         testPutMultipleBlocks(t, factory)
32
33         testPutAndTouch(t, factory)
34         testTouchNoSuchBlock(t, factory)
35
36         testMtimeNoSuchBlock(t, factory)
37
38         testIndexTo(t, factory)
39
40         testDeleteNewBlock(t, factory)
41         testDeleteOldBlock(t, factory)
42         testDeleteNoSuchBlock(t, factory)
43
44         testStatus(t, factory)
45
46         testString(t, factory)
47
48         testUpdateReadOnly(t, factory)
49
50         testGetConcurrent(t, factory)
51         testPutConcurrent(t, factory)
52 }
53
54 // Put a test block, get it and verify content
55 // Test should pass for both writable and read-only volumes
56 func testGet(t *testing.T, factory TestableVolumeFactory) {
57         v := factory(t)
58         defer v.Teardown()
59
60         v.PutRaw(TestHash, TestBlock)
61
62         buf, err := v.Get(TestHash)
63         if err != nil {
64                 t.Fatal(err)
65         }
66
67         bufs.Put(buf)
68
69         if bytes.Compare(buf, TestBlock) != 0 {
70                 t.Errorf("expected %s, got %s", string(TestBlock), string(buf))
71         }
72 }
73
74 // Invoke get on a block that does not exist in volume; should result in error
75 // Test should pass for both writable and read-only volumes
76 func testGetNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
77         v := factory(t)
78         defer v.Teardown()
79
80         if _, err := v.Get(TestHash2); err == nil {
81                 t.Errorf("Expected error while getting non-existing block %v", TestHash2)
82         }
83 }
84
85 // Put a test block and compare the locator with same content
86 // Test should pass for both writable and read-only volumes
87 func testCompareSameContent(t *testing.T, factory TestableVolumeFactory) {
88         v := factory(t)
89         defer v.Teardown()
90
91         v.PutRaw(TestHash, TestBlock)
92
93         // Compare the block locator with same content
94         err := v.Compare(TestHash, TestBlock)
95         if err != nil {
96                 t.Errorf("Got err %q, expected nil", err)
97         }
98 }
99
100 // Put a test block and compare the locator with a different content
101 // Expect error due to collision
102 // Test should pass for both writable and read-only volumes
103 func testCompareWithDifferentContent(t *testing.T, factory TestableVolumeFactory) {
104         v := factory(t)
105         defer v.Teardown()
106
107         v.PutRaw(TestHash, TestBlock)
108
109         // Compare the block locator with different content; collision
110         err := v.Compare(TestHash, []byte("baddata"))
111         if err == nil {
112                 t.Errorf("Expected error due to collision")
113         }
114 }
115
116 // Put a test block with bad data (hash does not match, but Put does not verify)
117 // Compare the locator with good data whose hash matches with locator
118 // Expect error due to corruption.
119 // Test should pass for both writable and read-only volumes
120 func testCompareWithBadData(t *testing.T, factory TestableVolumeFactory) {
121         v := factory(t)
122         defer v.Teardown()
123
124         v.PutRaw(TestHash, []byte("baddata"))
125
126         err := v.Compare(TestHash, TestBlock)
127         if err == nil {
128                 t.Errorf("Expected error due to corruption")
129         }
130 }
131
132 // Put a block and put again with same content
133 // Test is intended for only writable volumes
134 func testPutBlockWithSameContent(t *testing.T, factory TestableVolumeFactory) {
135         v := factory(t)
136         defer v.Teardown()
137
138         if v.Writable() == false {
139                 return
140         }
141
142         err := v.Put(TestHash, TestBlock)
143         if err != nil {
144                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
145         }
146
147         err = v.Put(TestHash, TestBlock)
148         if err != nil {
149                 t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
150         }
151 }
152
153 // Put a block and put again with different content
154 // Test is intended for only writable volumes
155 func testPutBlockWithDifferentContent(t *testing.T, factory TestableVolumeFactory) {
156         v := factory(t)
157         defer v.Teardown()
158
159         if v.Writable() == false {
160                 return
161         }
162
163         err := v.Put(TestHash, TestBlock)
164         if err != nil {
165                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
166         }
167
168         putErr := v.Put(TestHash, TestBlock2)
169         buf, getErr := v.Get(TestHash)
170         if putErr == nil {
171                 // Put must not return a nil error unless it has
172                 // overwritten the existing data.
173                 if bytes.Compare(buf, TestBlock2) != 0 {
174                         t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, TestBlock2)
175                 }
176         } else {
177                 // It is permissible for Put to fail, but it must
178                 // leave us with either the original data, the new
179                 // data, or nothing at all.
180                 if getErr == nil && bytes.Compare(buf, TestBlock) != 0 && bytes.Compare(buf, TestBlock2) != 0 {
181                         t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, TestBlock, TestBlock2)
182                 }
183         }
184         if getErr == nil {
185                 bufs.Put(buf)
186         }
187 }
188
189 // Put and get multiple blocks
190 // Test is intended for only writable volumes
191 func testPutMultipleBlocks(t *testing.T, factory TestableVolumeFactory) {
192         v := factory(t)
193         defer v.Teardown()
194
195         if v.Writable() == false {
196                 return
197         }
198
199         err := v.Put(TestHash, TestBlock)
200         if err != nil {
201                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
202         }
203
204         err = v.Put(TestHash2, TestBlock2)
205         if err != nil {
206                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
207         }
208
209         err = v.Put(TestHash3, TestBlock3)
210         if err != nil {
211                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
212         }
213
214         data, err := v.Get(TestHash)
215         if err != nil {
216                 t.Error(err)
217         } else {
218                 if bytes.Compare(data, TestBlock) != 0 {
219                         t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock)
220                 }
221                 bufs.Put(data)
222         }
223
224         data, err = v.Get(TestHash2)
225         if err != nil {
226                 t.Error(err)
227         } else {
228                 if bytes.Compare(data, TestBlock2) != 0 {
229                         t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock2)
230                 }
231                 bufs.Put(data)
232         }
233
234         data, err = v.Get(TestHash3)
235         if err != nil {
236                 t.Error(err)
237         } else {
238                 if bytes.Compare(data, TestBlock3) != 0 {
239                         t.Errorf("Block present, but to %+q, expected %+q", data, TestBlock3)
240                 }
241                 bufs.Put(data)
242         }
243 }
244
245 // testPutAndTouch
246 //   Test that when applying PUT to a block that already exists,
247 //   the block's modification time is updated.
248 // Test is intended for only writable volumes
249 func testPutAndTouch(t *testing.T, factory TestableVolumeFactory) {
250         v := factory(t)
251         defer v.Teardown()
252
253         if v.Writable() == false {
254                 return
255         }
256
257         if err := v.Put(TestHash, TestBlock); err != nil {
258                 t.Error(err)
259         }
260
261         // We'll verify { t0 < threshold < t1 }, where t0 is the
262         // existing block's timestamp on disk before Put() and t1 is
263         // its timestamp after Put().
264         threshold := time.Now().Add(-time.Second)
265
266         // Set the stored block's mtime far enough in the past that we
267         // can see the difference between "timestamp didn't change"
268         // and "timestamp granularity is too low".
269         v.TouchWithDate(TestHash, time.Now().Add(-20*time.Second))
270
271         // Make sure v.Mtime() agrees the above Utime really worked.
272         if t0, err := v.Mtime(TestHash); err != nil || t0.IsZero() || !t0.Before(threshold) {
273                 t.Errorf("Setting mtime failed: %v, %v", t0, err)
274         }
275
276         // Write the same block again.
277         if err := v.Put(TestHash, TestBlock); err != nil {
278                 t.Error(err)
279         }
280
281         // Verify threshold < t1
282         if t1, err := v.Mtime(TestHash); err != nil {
283                 t.Error(err)
284         } else if t1.Before(threshold) {
285                 t.Errorf("t1 %v should be >= threshold %v after v.Put ", t1, threshold)
286         }
287 }
288
289 // Touching a non-existing block should result in error.
290 // Test should pass for both writable and read-only volumes
291 func testTouchNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
292         v := factory(t)
293         defer v.Teardown()
294
295         if err := v.Touch(TestHash); err == nil {
296                 t.Error("Expected error when attempted to touch a non-existing block")
297         }
298 }
299
300 // Invoking Mtime on a non-existing block should result in error.
301 // Test should pass for both writable and read-only volumes
302 func testMtimeNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
303         v := factory(t)
304         defer v.Teardown()
305
306         if _, err := v.Mtime("12345678901234567890123456789012"); err == nil {
307                 t.Error("Expected error when updating Mtime on a non-existing block")
308         }
309 }
310
311 // Put a few blocks and invoke IndexTo with:
312 // * no prefix
313 // * with a prefix
314 // * with no such prefix
315 // Test should pass for both writable and read-only volumes
316 func testIndexTo(t *testing.T, factory TestableVolumeFactory) {
317         v := factory(t)
318         defer v.Teardown()
319
320         v.PutRaw(TestHash, TestBlock)
321         v.PutRaw(TestHash2, TestBlock2)
322         v.PutRaw(TestHash3, TestBlock3)
323
324         buf := new(bytes.Buffer)
325         v.IndexTo("", buf)
326         indexRows := strings.Split(string(buf.Bytes()), "\n")
327         sort.Strings(indexRows)
328         sortedIndex := strings.Join(indexRows, "\n")
329         m, err := regexp.MatchString(
330                 `^\n`+TestHash+`\+\d+ \d+\n`+
331                         TestHash3+`\+\d+ \d+\n`+
332                         TestHash2+`\+\d+ \d+$`,
333                 sortedIndex)
334         if err != nil {
335                 t.Error(err)
336         } else if !m {
337                 t.Errorf("Got index %q for empty prefix", sortedIndex)
338         }
339
340         for _, prefix := range []string{"f", "f15", "f15ac"} {
341                 buf = new(bytes.Buffer)
342                 v.IndexTo(prefix, buf)
343
344                 m, err := regexp.MatchString(`^`+TestHash2+`\+\d+ \d+\n$`, string(buf.Bytes()))
345                 if err != nil {
346                         t.Error(err)
347                 } else if !m {
348                         t.Errorf("Got index %q for prefix %s", string(buf.Bytes()), prefix)
349                 }
350         }
351
352         for _, prefix := range []string{"zero", "zip", "zilch"} {
353                 buf = new(bytes.Buffer)
354                 v.IndexTo(prefix, buf)
355                 if err != nil {
356                         t.Errorf("Got error on IndexTo with no such prefix %v", err.Error())
357                 } else if buf.Len() != 0 {
358                         t.Errorf("Expected empty list for IndexTo with no such prefix %s", prefix)
359                 }
360         }
361 }
362
363 // Calling Delete() for a block immediately after writing it (not old enough)
364 // should neither delete the data nor return an error.
365 // Test is intended for only writable volumes
366 func testDeleteNewBlock(t *testing.T, factory TestableVolumeFactory) {
367         v := factory(t)
368         defer v.Teardown()
369         blobSignatureTTL = 300 * time.Second
370
371         if v.Writable() == false {
372                 return
373         }
374
375         v.Put(TestHash, TestBlock)
376
377         if err := v.Delete(TestHash); err != nil {
378                 t.Error(err)
379         }
380         data, err := v.Get(TestHash)
381         if err != nil {
382                 t.Error(err)
383         } else {
384                 if bytes.Compare(data, TestBlock) != 0 {
385                         t.Errorf("Got data %+q, expected %+q", data, TestBlock)
386                 }
387                 bufs.Put(data)
388         }
389 }
390
391 // Calling Delete() for a block with a timestamp older than
392 // blobSignatureTTL seconds in the past should delete the data.
393 // Test is intended for only writable volumes
394 func testDeleteOldBlock(t *testing.T, factory TestableVolumeFactory) {
395         v := factory(t)
396         defer v.Teardown()
397         blobSignatureTTL = 300 * time.Second
398
399         if v.Writable() == false {
400                 return
401         }
402
403         v.Put(TestHash, TestBlock)
404         v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
405
406         if err := v.Delete(TestHash); err != nil {
407                 t.Error(err)
408         }
409         if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
410                 t.Errorf("os.IsNotExist(%v) should have been true", err)
411         }
412 }
413
414 // Calling Delete() for a block that does not exist should result in error.
415 // Test should pass for both writable and read-only volumes
416 func testDeleteNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
417         v := factory(t)
418         defer v.Teardown()
419
420         if err := v.Delete(TestHash2); err == nil {
421                 t.Errorf("Expected error when attempting to delete a non-existing block")
422         }
423 }
424
425 // Invoke Status and verify that VolumeStatus is returned
426 // Test should pass for both writable and read-only volumes
427 func testStatus(t *testing.T, factory TestableVolumeFactory) {
428         v := factory(t)
429         defer v.Teardown()
430
431         // Get node status and make a basic sanity check.
432         status := v.Status()
433         if status.DeviceNum == 0 {
434                 t.Errorf("uninitialized device_num in %v", status)
435         }
436
437         if status.BytesFree == 0 {
438                 t.Errorf("uninitialized bytes_free in %v", status)
439         }
440
441         if status.BytesUsed == 0 {
442                 t.Errorf("uninitialized bytes_used in %v", status)
443         }
444 }
445
446 // Invoke String for the volume; expect non-empty result
447 // Test should pass for both writable and read-only volumes
448 func testString(t *testing.T, factory TestableVolumeFactory) {
449         v := factory(t)
450         defer v.Teardown()
451
452         if id := v.String(); len(id) == 0 {
453                 t.Error("Got empty string for v.String()")
454         }
455 }
456
457 // Putting, updating, touching, and deleting blocks from a read-only volume result in error.
458 // Test is intended for only read-only volumes
459 func testUpdateReadOnly(t *testing.T, factory TestableVolumeFactory) {
460         v := factory(t)
461         defer v.Teardown()
462
463         if v.Writable() == true {
464                 return
465         }
466
467         v.PutRaw(TestHash, TestBlock)
468
469         // Get from read-only volume should succeed
470         _, err := v.Get(TestHash)
471         if err != nil {
472                 t.Errorf("got err %v, expected nil", err)
473         }
474
475         // Put a new block to read-only volume should result in error
476         err = v.Put(TestHash2, TestBlock2)
477         if err == nil {
478                 t.Errorf("Expected error when putting block in a read-only volume")
479         }
480         _, err = v.Get(TestHash2)
481         if err == nil {
482                 t.Errorf("Expected error when getting block whose put in read-only volume failed")
483         }
484
485         // Touch a block in read-only volume should result in error
486         err = v.Touch(TestHash)
487         if err == nil {
488                 t.Errorf("Expected error when touching block in a read-only volume")
489         }
490
491         // Delete a block from a read-only volume should result in error
492         err = v.Delete(TestHash)
493         if err == nil {
494                 t.Errorf("Expected error when deleting block from a read-only volume")
495         }
496
497         // Overwriting an existing block in read-only volume should result in error
498         err = v.Put(TestHash, TestBlock)
499         if err == nil {
500                 t.Errorf("Expected error when putting block in a read-only volume")
501         }
502 }
503
504 // Launch concurrent Gets
505 // Test should pass for both writable and read-only volumes
506 func testGetConcurrent(t *testing.T, factory TestableVolumeFactory) {
507         v := factory(t)
508         defer v.Teardown()
509
510         v.PutRaw(TestHash, TestBlock)
511         v.PutRaw(TestHash2, TestBlock2)
512         v.PutRaw(TestHash3, TestBlock3)
513
514         sem := make(chan int)
515         go func(sem chan int) {
516                 buf, err := v.Get(TestHash)
517                 if err != nil {
518                         t.Errorf("err1: %v", err)
519                 }
520                 bufs.Put(buf)
521                 if bytes.Compare(buf, TestBlock) != 0 {
522                         t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf))
523                 }
524                 sem <- 1
525         }(sem)
526
527         go func(sem chan int) {
528                 buf, err := v.Get(TestHash2)
529                 if err != nil {
530                         t.Errorf("err2: %v", err)
531                 }
532                 bufs.Put(buf)
533                 if bytes.Compare(buf, TestBlock2) != 0 {
534                         t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf))
535                 }
536                 sem <- 1
537         }(sem)
538
539         go func(sem chan int) {
540                 buf, err := v.Get(TestHash3)
541                 if err != nil {
542                         t.Errorf("err3: %v", err)
543                 }
544                 bufs.Put(buf)
545                 if bytes.Compare(buf, TestBlock3) != 0 {
546                         t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf))
547                 }
548                 sem <- 1
549         }(sem)
550
551         // Wait for all goroutines to finish
552         for done := 0; done < 3; {
553                 done += <-sem
554         }
555 }
556
557 // Launch concurrent Puts
558 // Test is intended for only writable volumes
559 func testPutConcurrent(t *testing.T, factory TestableVolumeFactory) {
560         v := factory(t)
561         defer v.Teardown()
562
563         if v.Writable() == false {
564                 return
565         }
566
567         sem := make(chan int)
568         go func(sem chan int) {
569                 err := v.Put(TestHash, TestBlock)
570                 if err != nil {
571                         t.Errorf("err1: %v", err)
572                 }
573                 sem <- 1
574         }(sem)
575
576         go func(sem chan int) {
577                 err := v.Put(TestHash2, TestBlock2)
578                 if err != nil {
579                         t.Errorf("err2: %v", err)
580                 }
581                 sem <- 1
582         }(sem)
583
584         go func(sem chan int) {
585                 err := v.Put(TestHash3, TestBlock3)
586                 if err != nil {
587                         t.Errorf("err3: %v", err)
588                 }
589                 sem <- 1
590         }(sem)
591
592         // Wait for all goroutines to finish
593         for done := 0; done < 3; {
594                 done += <-sem
595         }
596
597         // Double check that we actually wrote the blocks we expected to write.
598         buf, err := v.Get(TestHash)
599         if err != nil {
600                 t.Errorf("Get #1: %v", err)
601         }
602         bufs.Put(buf)
603         if bytes.Compare(buf, TestBlock) != 0 {
604                 t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf))
605         }
606
607         buf, err = v.Get(TestHash2)
608         if err != nil {
609                 t.Errorf("Get #2: %v", err)
610         }
611         bufs.Put(buf)
612         if bytes.Compare(buf, TestBlock2) != 0 {
613                 t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf))
614         }
615
616         buf, err = v.Get(TestHash3)
617         if err != nil {
618                 t.Errorf("Get #3: %v", err)
619         }
620         bufs.Put(buf)
621         if bytes.Compare(buf, TestBlock3) != 0 {
622                 t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf))
623         }
624 }