Merge branch 'master' into 7167-keep-rsync
[arvados.git] / services / keepstore / volume_generic_test.go
1 package main
2
3 import (
4         "bytes"
5         "crypto/md5"
6         "fmt"
7         "os"
8         "regexp"
9         "sort"
10         "strings"
11         "testing"
12         "time"
13 )
14
15 // A TestableVolumeFactory returns a new TestableVolume. The factory
16 // function, and the TestableVolume it returns, can use "t" to write
17 // logs, fail the current test, etc.
18 type TestableVolumeFactory func(t *testing.T) TestableVolume
19
20 // DoGenericVolumeTests runs a set of tests that every TestableVolume
21 // is expected to pass. It calls factory to create a new TestableVolume
22 // for each test case, to avoid leaking state between tests.
23 func DoGenericVolumeTests(t *testing.T, factory TestableVolumeFactory) {
24         testGet(t, factory)
25         testGetNoSuchBlock(t, factory)
26
27         testCompareNonexistent(t, factory)
28         testCompareSameContent(t, factory, TestHash, TestBlock)
29         testCompareSameContent(t, factory, EmptyHash, EmptyBlock)
30         testCompareWithCollision(t, factory, TestHash, TestBlock, []byte("baddata"))
31         testCompareWithCollision(t, factory, TestHash, TestBlock, EmptyBlock)
32         testCompareWithCollision(t, factory, EmptyHash, EmptyBlock, TestBlock)
33         testCompareWithCorruptStoredData(t, factory, TestHash, TestBlock, []byte("baddata"))
34         testCompareWithCorruptStoredData(t, factory, TestHash, TestBlock, EmptyBlock)
35         testCompareWithCorruptStoredData(t, factory, EmptyHash, EmptyBlock, []byte("baddata"))
36
37         testPutBlockWithSameContent(t, factory, TestHash, TestBlock)
38         testPutBlockWithSameContent(t, factory, EmptyHash, EmptyBlock)
39         testPutBlockWithDifferentContent(t, factory, TestHash, TestBlock, TestBlock2)
40         testPutBlockWithDifferentContent(t, factory, TestHash, EmptyBlock, TestBlock)
41         testPutBlockWithDifferentContent(t, factory, TestHash, TestBlock, EmptyBlock)
42         testPutBlockWithDifferentContent(t, factory, EmptyHash, EmptyBlock, TestBlock)
43         testPutMultipleBlocks(t, factory)
44
45         testPutAndTouch(t, factory)
46         testTouchNoSuchBlock(t, factory)
47
48         testMtimeNoSuchBlock(t, factory)
49
50         testIndexTo(t, factory)
51
52         testDeleteNewBlock(t, factory)
53         testDeleteOldBlock(t, factory)
54         testDeleteNoSuchBlock(t, factory)
55
56         testStatus(t, factory)
57
58         testString(t, factory)
59
60         testUpdateReadOnly(t, factory)
61
62         testGetConcurrent(t, factory)
63         testPutConcurrent(t, factory)
64
65         testPutFullBlock(t, factory)
66 }
67
68 // Put a test block, get it and verify content
69 // Test should pass for both writable and read-only volumes
70 func testGet(t *testing.T, factory TestableVolumeFactory) {
71         v := factory(t)
72         defer v.Teardown()
73
74         v.PutRaw(TestHash, TestBlock)
75
76         buf, err := v.Get(TestHash)
77         if err != nil {
78                 t.Fatal(err)
79         }
80
81         bufs.Put(buf)
82
83         if bytes.Compare(buf, TestBlock) != 0 {
84                 t.Errorf("expected %s, got %s", string(TestBlock), string(buf))
85         }
86 }
87
88 // Invoke get on a block that does not exist in volume; should result in error
89 // Test should pass for both writable and read-only volumes
90 func testGetNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
91         v := factory(t)
92         defer v.Teardown()
93
94         if _, err := v.Get(TestHash2); err == nil {
95                 t.Errorf("Expected error while getting non-existing block %v", TestHash2)
96         }
97 }
98
99 // Compare() should return os.ErrNotExist if the block does not exist.
100 // Otherwise, writing new data causes CompareAndTouch() to generate
101 // error logs even though everything is working fine.
102 func testCompareNonexistent(t *testing.T, factory TestableVolumeFactory) {
103         v := factory(t)
104         defer v.Teardown()
105
106         err := v.Compare(TestHash, TestBlock)
107         if err != os.ErrNotExist {
108                 t.Errorf("Got err %T %q, expected os.ErrNotExist", err, err)
109         }
110 }
111
112 // Put a test block and compare the locator with same content
113 // Test should pass for both writable and read-only volumes
114 func testCompareSameContent(t *testing.T, factory TestableVolumeFactory, testHash string, testData []byte) {
115         v := factory(t)
116         defer v.Teardown()
117
118         v.PutRaw(testHash, testData)
119
120         // Compare the block locator with same content
121         err := v.Compare(testHash, testData)
122         if err != nil {
123                 t.Errorf("Got err %q, expected nil", err)
124         }
125 }
126
127 // Test behavior of Compare() when stored data matches expected
128 // checksum but differs from new data we need to store. Requires
129 // testHash = md5(testDataA).
130 //
131 // Test should pass for both writable and read-only volumes
132 func testCompareWithCollision(t *testing.T, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
133         v := factory(t)
134         defer v.Teardown()
135
136         v.PutRaw(testHash, testDataA)
137
138         // Compare the block locator with different content; collision
139         err := v.Compare(TestHash, testDataB)
140         if err == nil {
141                 t.Errorf("Got err nil, expected error due to collision")
142         }
143 }
144
145 // Test behavior of Compare() when stored data has become
146 // corrupted. Requires testHash = md5(testDataA) != md5(testDataB).
147 //
148 // Test should pass for both writable and read-only volumes
149 func testCompareWithCorruptStoredData(t *testing.T, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
150         v := factory(t)
151         defer v.Teardown()
152
153         v.PutRaw(TestHash, testDataB)
154
155         err := v.Compare(testHash, testDataA)
156         if err == nil || err == CollisionError {
157                 t.Errorf("Got err %+v, expected non-collision error", err)
158         }
159 }
160
161 // Put a block and put again with same content
162 // Test is intended for only writable volumes
163 func testPutBlockWithSameContent(t *testing.T, factory TestableVolumeFactory, testHash string, testData []byte) {
164         v := factory(t)
165         defer v.Teardown()
166
167         if v.Writable() == false {
168                 return
169         }
170
171         err := v.Put(testHash, testData)
172         if err != nil {
173                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
174         }
175
176         err = v.Put(testHash, testData)
177         if err != nil {
178                 t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
179         }
180 }
181
182 // Put a block and put again with different content
183 // Test is intended for only writable volumes
184 func testPutBlockWithDifferentContent(t *testing.T, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
185         v := factory(t)
186         defer v.Teardown()
187
188         if v.Writable() == false {
189                 return
190         }
191
192         err := v.Put(testHash, testDataA)
193         if err != nil {
194                 t.Errorf("Got err putting block %q: %q, expected nil", testDataA, err)
195         }
196
197         putErr := v.Put(testHash, testDataB)
198         buf, getErr := v.Get(testHash)
199         if putErr == nil {
200                 // Put must not return a nil error unless it has
201                 // overwritten the existing data.
202                 if bytes.Compare(buf, testDataB) != 0 {
203                         t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, testDataB)
204                 }
205         } else {
206                 // It is permissible for Put to fail, but it must
207                 // leave us with either the original data, the new
208                 // data, or nothing at all.
209                 if getErr == nil && bytes.Compare(buf, testDataA) != 0 && bytes.Compare(buf, testDataB) != 0 {
210                         t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, testDataA, testDataB)
211                 }
212         }
213         if getErr == nil {
214                 bufs.Put(buf)
215         }
216 }
217
218 // Put and get multiple blocks
219 // Test is intended for only writable volumes
220 func testPutMultipleBlocks(t *testing.T, factory TestableVolumeFactory) {
221         v := factory(t)
222         defer v.Teardown()
223
224         if v.Writable() == false {
225                 return
226         }
227
228         err := v.Put(TestHash, TestBlock)
229         if err != nil {
230                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
231         }
232
233         err = v.Put(TestHash2, TestBlock2)
234         if err != nil {
235                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
236         }
237
238         err = v.Put(TestHash3, TestBlock3)
239         if err != nil {
240                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
241         }
242
243         data, err := v.Get(TestHash)
244         if err != nil {
245                 t.Error(err)
246         } else {
247                 if bytes.Compare(data, TestBlock) != 0 {
248                         t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock)
249                 }
250                 bufs.Put(data)
251         }
252
253         data, err = v.Get(TestHash2)
254         if err != nil {
255                 t.Error(err)
256         } else {
257                 if bytes.Compare(data, TestBlock2) != 0 {
258                         t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock2)
259                 }
260                 bufs.Put(data)
261         }
262
263         data, err = v.Get(TestHash3)
264         if err != nil {
265                 t.Error(err)
266         } else {
267                 if bytes.Compare(data, TestBlock3) != 0 {
268                         t.Errorf("Block present, but to %+q, expected %+q", data, TestBlock3)
269                 }
270                 bufs.Put(data)
271         }
272 }
273
274 // testPutAndTouch
275 //   Test that when applying PUT to a block that already exists,
276 //   the block's modification time is updated.
277 // Test is intended for only writable volumes
278 func testPutAndTouch(t *testing.T, factory TestableVolumeFactory) {
279         v := factory(t)
280         defer v.Teardown()
281
282         if v.Writable() == false {
283                 return
284         }
285
286         if err := v.Put(TestHash, TestBlock); err != nil {
287                 t.Error(err)
288         }
289
290         // We'll verify { t0 < threshold < t1 }, where t0 is the
291         // existing block's timestamp on disk before Put() and t1 is
292         // its timestamp after Put().
293         threshold := time.Now().Add(-time.Second)
294
295         // Set the stored block's mtime far enough in the past that we
296         // can see the difference between "timestamp didn't change"
297         // and "timestamp granularity is too low".
298         v.TouchWithDate(TestHash, time.Now().Add(-20*time.Second))
299
300         // Make sure v.Mtime() agrees the above Utime really worked.
301         if t0, err := v.Mtime(TestHash); err != nil || t0.IsZero() || !t0.Before(threshold) {
302                 t.Errorf("Setting mtime failed: %v, %v", t0, err)
303         }
304
305         // Write the same block again.
306         if err := v.Put(TestHash, TestBlock); err != nil {
307                 t.Error(err)
308         }
309
310         // Verify threshold < t1
311         if t1, err := v.Mtime(TestHash); err != nil {
312                 t.Error(err)
313         } else if t1.Before(threshold) {
314                 t.Errorf("t1 %v should be >= threshold %v after v.Put ", t1, threshold)
315         }
316 }
317
318 // Touching a non-existing block should result in error.
319 // Test should pass for both writable and read-only volumes
320 func testTouchNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
321         v := factory(t)
322         defer v.Teardown()
323
324         if err := v.Touch(TestHash); err == nil {
325                 t.Error("Expected error when attempted to touch a non-existing block")
326         }
327 }
328
329 // Invoking Mtime on a non-existing block should result in error.
330 // Test should pass for both writable and read-only volumes
331 func testMtimeNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
332         v := factory(t)
333         defer v.Teardown()
334
335         if _, err := v.Mtime("12345678901234567890123456789012"); err == nil {
336                 t.Error("Expected error when updating Mtime on a non-existing block")
337         }
338 }
339
340 // Put a few blocks and invoke IndexTo with:
341 // * no prefix
342 // * with a prefix
343 // * with no such prefix
344 // Test should pass for both writable and read-only volumes
345 func testIndexTo(t *testing.T, factory TestableVolumeFactory) {
346         v := factory(t)
347         defer v.Teardown()
348
349         v.PutRaw(TestHash, TestBlock)
350         v.PutRaw(TestHash2, TestBlock2)
351         v.PutRaw(TestHash3, TestBlock3)
352
353         buf := new(bytes.Buffer)
354         v.IndexTo("", buf)
355         indexRows := strings.Split(string(buf.Bytes()), "\n")
356         sort.Strings(indexRows)
357         sortedIndex := strings.Join(indexRows, "\n")
358         m, err := regexp.MatchString(
359                 `^\n`+TestHash+`\+\d+ \d+\n`+
360                         TestHash3+`\+\d+ \d+\n`+
361                         TestHash2+`\+\d+ \d+$`,
362                 sortedIndex)
363         if err != nil {
364                 t.Error(err)
365         } else if !m {
366                 t.Errorf("Got index %q for empty prefix", sortedIndex)
367         }
368
369         for _, prefix := range []string{"f", "f15", "f15ac"} {
370                 buf = new(bytes.Buffer)
371                 v.IndexTo(prefix, buf)
372
373                 m, err := regexp.MatchString(`^`+TestHash2+`\+\d+ \d+\n$`, string(buf.Bytes()))
374                 if err != nil {
375                         t.Error(err)
376                 } else if !m {
377                         t.Errorf("Got index %q for prefix %s", string(buf.Bytes()), prefix)
378                 }
379         }
380
381         for _, prefix := range []string{"zero", "zip", "zilch"} {
382                 buf = new(bytes.Buffer)
383                 v.IndexTo(prefix, buf)
384                 if err != nil {
385                         t.Errorf("Got error on IndexTo with no such prefix %v", err.Error())
386                 } else if buf.Len() != 0 {
387                         t.Errorf("Expected empty list for IndexTo with no such prefix %s", prefix)
388                 }
389         }
390 }
391
392 // Calling Delete() for a block immediately after writing it (not old enough)
393 // should neither delete the data nor return an error.
394 // Test is intended for only writable volumes
395 func testDeleteNewBlock(t *testing.T, factory TestableVolumeFactory) {
396         v := factory(t)
397         defer v.Teardown()
398         blobSignatureTTL = 300 * time.Second
399
400         if v.Writable() == false {
401                 return
402         }
403
404         v.Put(TestHash, TestBlock)
405
406         if err := v.Delete(TestHash); err != nil {
407                 t.Error(err)
408         }
409         data, err := v.Get(TestHash)
410         if err != nil {
411                 t.Error(err)
412         } else {
413                 if bytes.Compare(data, TestBlock) != 0 {
414                         t.Errorf("Got data %+q, expected %+q", data, TestBlock)
415                 }
416                 bufs.Put(data)
417         }
418 }
419
420 // Calling Delete() for a block with a timestamp older than
421 // blobSignatureTTL seconds in the past should delete the data.
422 // Test is intended for only writable volumes
423 func testDeleteOldBlock(t *testing.T, factory TestableVolumeFactory) {
424         v := factory(t)
425         defer v.Teardown()
426         blobSignatureTTL = 300 * time.Second
427
428         if v.Writable() == false {
429                 return
430         }
431
432         v.Put(TestHash, TestBlock)
433         v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
434
435         if err := v.Delete(TestHash); err != nil {
436                 t.Error(err)
437         }
438         if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
439                 t.Errorf("os.IsNotExist(%v) should have been true", err)
440         }
441 }
442
443 // Calling Delete() for a block that does not exist should result in error.
444 // Test should pass for both writable and read-only volumes
445 func testDeleteNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
446         v := factory(t)
447         defer v.Teardown()
448
449         if err := v.Delete(TestHash2); err == nil {
450                 t.Errorf("Expected error when attempting to delete a non-existing block")
451         }
452 }
453
454 // Invoke Status and verify that VolumeStatus is returned
455 // Test should pass for both writable and read-only volumes
456 func testStatus(t *testing.T, factory TestableVolumeFactory) {
457         v := factory(t)
458         defer v.Teardown()
459
460         // Get node status and make a basic sanity check.
461         status := v.Status()
462         if status.DeviceNum == 0 {
463                 t.Errorf("uninitialized device_num in %v", status)
464         }
465
466         if status.BytesFree == 0 {
467                 t.Errorf("uninitialized bytes_free in %v", status)
468         }
469
470         if status.BytesUsed == 0 {
471                 t.Errorf("uninitialized bytes_used in %v", status)
472         }
473 }
474
475 // Invoke String for the volume; expect non-empty result
476 // Test should pass for both writable and read-only volumes
477 func testString(t *testing.T, factory TestableVolumeFactory) {
478         v := factory(t)
479         defer v.Teardown()
480
481         if id := v.String(); len(id) == 0 {
482                 t.Error("Got empty string for v.String()")
483         }
484 }
485
486 // Putting, updating, touching, and deleting blocks from a read-only volume result in error.
487 // Test is intended for only read-only volumes
488 func testUpdateReadOnly(t *testing.T, factory TestableVolumeFactory) {
489         v := factory(t)
490         defer v.Teardown()
491
492         if v.Writable() == true {
493                 return
494         }
495
496         v.PutRaw(TestHash, TestBlock)
497
498         // Get from read-only volume should succeed
499         _, err := v.Get(TestHash)
500         if err != nil {
501                 t.Errorf("got err %v, expected nil", err)
502         }
503
504         // Put a new block to read-only volume should result in error
505         err = v.Put(TestHash2, TestBlock2)
506         if err == nil {
507                 t.Errorf("Expected error when putting block in a read-only volume")
508         }
509         _, err = v.Get(TestHash2)
510         if err == nil {
511                 t.Errorf("Expected error when getting block whose put in read-only volume failed")
512         }
513
514         // Touch a block in read-only volume should result in error
515         err = v.Touch(TestHash)
516         if err == nil {
517                 t.Errorf("Expected error when touching block in a read-only volume")
518         }
519
520         // Delete a block from a read-only volume should result in error
521         err = v.Delete(TestHash)
522         if err == nil {
523                 t.Errorf("Expected error when deleting block from a read-only volume")
524         }
525
526         // Overwriting an existing block in read-only volume should result in error
527         err = v.Put(TestHash, TestBlock)
528         if err == nil {
529                 t.Errorf("Expected error when putting block in a read-only volume")
530         }
531 }
532
533 // Launch concurrent Gets
534 // Test should pass for both writable and read-only volumes
535 func testGetConcurrent(t *testing.T, factory TestableVolumeFactory) {
536         v := factory(t)
537         defer v.Teardown()
538
539         v.PutRaw(TestHash, TestBlock)
540         v.PutRaw(TestHash2, TestBlock2)
541         v.PutRaw(TestHash3, TestBlock3)
542
543         sem := make(chan int)
544         go func(sem chan int) {
545                 buf, err := v.Get(TestHash)
546                 if err != nil {
547                         t.Errorf("err1: %v", err)
548                 }
549                 bufs.Put(buf)
550                 if bytes.Compare(buf, TestBlock) != 0 {
551                         t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf))
552                 }
553                 sem <- 1
554         }(sem)
555
556         go func(sem chan int) {
557                 buf, err := v.Get(TestHash2)
558                 if err != nil {
559                         t.Errorf("err2: %v", err)
560                 }
561                 bufs.Put(buf)
562                 if bytes.Compare(buf, TestBlock2) != 0 {
563                         t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf))
564                 }
565                 sem <- 1
566         }(sem)
567
568         go func(sem chan int) {
569                 buf, err := v.Get(TestHash3)
570                 if err != nil {
571                         t.Errorf("err3: %v", err)
572                 }
573                 bufs.Put(buf)
574                 if bytes.Compare(buf, TestBlock3) != 0 {
575                         t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf))
576                 }
577                 sem <- 1
578         }(sem)
579
580         // Wait for all goroutines to finish
581         for done := 0; done < 3; {
582                 done += <-sem
583         }
584 }
585
586 // Launch concurrent Puts
587 // Test is intended for only writable volumes
588 func testPutConcurrent(t *testing.T, factory TestableVolumeFactory) {
589         v := factory(t)
590         defer v.Teardown()
591
592         if v.Writable() == false {
593                 return
594         }
595
596         sem := make(chan int)
597         go func(sem chan int) {
598                 err := v.Put(TestHash, TestBlock)
599                 if err != nil {
600                         t.Errorf("err1: %v", err)
601                 }
602                 sem <- 1
603         }(sem)
604
605         go func(sem chan int) {
606                 err := v.Put(TestHash2, TestBlock2)
607                 if err != nil {
608                         t.Errorf("err2: %v", err)
609                 }
610                 sem <- 1
611         }(sem)
612
613         go func(sem chan int) {
614                 err := v.Put(TestHash3, TestBlock3)
615                 if err != nil {
616                         t.Errorf("err3: %v", err)
617                 }
618                 sem <- 1
619         }(sem)
620
621         // Wait for all goroutines to finish
622         for done := 0; done < 3; {
623                 done += <-sem
624         }
625
626         // Double check that we actually wrote the blocks we expected to write.
627         buf, err := v.Get(TestHash)
628         if err != nil {
629                 t.Errorf("Get #1: %v", err)
630         }
631         bufs.Put(buf)
632         if bytes.Compare(buf, TestBlock) != 0 {
633                 t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf))
634         }
635
636         buf, err = v.Get(TestHash2)
637         if err != nil {
638                 t.Errorf("Get #2: %v", err)
639         }
640         bufs.Put(buf)
641         if bytes.Compare(buf, TestBlock2) != 0 {
642                 t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf))
643         }
644
645         buf, err = v.Get(TestHash3)
646         if err != nil {
647                 t.Errorf("Get #3: %v", err)
648         }
649         bufs.Put(buf)
650         if bytes.Compare(buf, TestBlock3) != 0 {
651                 t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf))
652         }
653 }
654
655 // Write and read back a full size block
656 func testPutFullBlock(t *testing.T, factory TestableVolumeFactory) {
657         v := factory(t)
658         defer v.Teardown()
659
660         if !v.Writable() {
661                 return
662         }
663
664         wdata := make([]byte, BlockSize)
665         wdata[0] = 'a'
666         wdata[BlockSize-1] = 'z'
667         hash := fmt.Sprintf("%x", md5.Sum(wdata))
668         err := v.Put(hash, wdata)
669         if err != nil {
670                 t.Fatal(err)
671         }
672         rdata, err := v.Get(hash)
673         if err != nil {
674                 t.Error(err)
675         } else {
676                 defer bufs.Put(rdata)
677         }
678         if bytes.Compare(rdata, wdata) != 0 {
679                 t.Error("rdata != wdata")
680         }
681 }