9701: Use a collection.OrderedDict instead of a simple dict to hold bufferblocks...
[arvados.git] / services / keepstore / volume_generic_test.go
1 package main
2
3 import (
4         "bytes"
5         "crypto/md5"
6         "fmt"
7         "os"
8         "regexp"
9         "sort"
10         "strconv"
11         "strings"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
15 )
16
17 type TB interface {
18         Error(args ...interface{})
19         Errorf(format string, args ...interface{})
20         Fail()
21         FailNow()
22         Failed() bool
23         Fatal(args ...interface{})
24         Fatalf(format string, args ...interface{})
25         Log(args ...interface{})
26         Logf(format string, args ...interface{})
27 }
28
29 // A TestableVolumeFactory returns a new TestableVolume. The factory
30 // function, and the TestableVolume it returns, can use "t" to write
31 // logs, fail the current test, etc.
32 type TestableVolumeFactory func(t TB) TestableVolume
33
34 // DoGenericVolumeTests runs a set of tests that every TestableVolume
35 // is expected to pass. It calls factory to create a new TestableVolume
36 // for each test case, to avoid leaking state between tests.
37 func DoGenericVolumeTests(t TB, factory TestableVolumeFactory) {
38         testGet(t, factory)
39         testGetNoSuchBlock(t, factory)
40
41         testCompareNonexistent(t, factory)
42         testCompareSameContent(t, factory, TestHash, TestBlock)
43         testCompareSameContent(t, factory, EmptyHash, EmptyBlock)
44         testCompareWithCollision(t, factory, TestHash, TestBlock, []byte("baddata"))
45         testCompareWithCollision(t, factory, TestHash, TestBlock, EmptyBlock)
46         testCompareWithCollision(t, factory, EmptyHash, EmptyBlock, TestBlock)
47         testCompareWithCorruptStoredData(t, factory, TestHash, TestBlock, []byte("baddata"))
48         testCompareWithCorruptStoredData(t, factory, TestHash, TestBlock, EmptyBlock)
49         testCompareWithCorruptStoredData(t, factory, EmptyHash, EmptyBlock, []byte("baddata"))
50
51         testPutBlockWithSameContent(t, factory, TestHash, TestBlock)
52         testPutBlockWithSameContent(t, factory, EmptyHash, EmptyBlock)
53         testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], arvadostest.MD5CollisionData[1])
54         testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, EmptyBlock, arvadostest.MD5CollisionData[0])
55         testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], EmptyBlock)
56         testPutBlockWithDifferentContent(t, factory, EmptyHash, EmptyBlock, arvadostest.MD5CollisionData[0])
57         testPutMultipleBlocks(t, factory)
58
59         testPutAndTouch(t, factory)
60         testTouchNoSuchBlock(t, factory)
61
62         testMtimeNoSuchBlock(t, factory)
63
64         testIndexTo(t, factory)
65
66         testDeleteNewBlock(t, factory)
67         testDeleteOldBlock(t, factory)
68         testDeleteNoSuchBlock(t, factory)
69
70         testStatus(t, factory)
71
72         testString(t, factory)
73
74         testUpdateReadOnly(t, factory)
75
76         testGetConcurrent(t, factory)
77         testPutConcurrent(t, factory)
78
79         testPutFullBlock(t, factory)
80
81         testTrashUntrash(t, factory)
82         testTrashEmptyTrashUntrash(t, factory)
83 }
84
85 // Put a test block, get it and verify content
86 // Test should pass for both writable and read-only volumes
87 func testGet(t TB, factory TestableVolumeFactory) {
88         v := factory(t)
89         defer v.Teardown()
90
91         v.PutRaw(TestHash, TestBlock)
92
93         buf := make([]byte, BlockSize)
94         n, err := v.Get(TestHash, buf)
95         if err != nil {
96                 t.Fatal(err)
97         }
98
99         if bytes.Compare(buf[:n], TestBlock) != 0 {
100                 t.Errorf("expected %s, got %s", string(TestBlock), string(buf))
101         }
102 }
103
104 // Invoke get on a block that does not exist in volume; should result in error
105 // Test should pass for both writable and read-only volumes
106 func testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
107         v := factory(t)
108         defer v.Teardown()
109
110         buf := make([]byte, BlockSize)
111         if _, err := v.Get(TestHash2, buf); err == nil {
112                 t.Errorf("Expected error while getting non-existing block %v", TestHash2)
113         }
114 }
115
116 // Compare() should return os.ErrNotExist if the block does not exist.
117 // Otherwise, writing new data causes CompareAndTouch() to generate
118 // error logs even though everything is working fine.
119 func testCompareNonexistent(t TB, factory TestableVolumeFactory) {
120         v := factory(t)
121         defer v.Teardown()
122
123         err := v.Compare(TestHash, TestBlock)
124         if err != os.ErrNotExist {
125                 t.Errorf("Got err %T %q, expected os.ErrNotExist", err, err)
126         }
127 }
128
129 // Put a test block and compare the locator with same content
130 // Test should pass for both writable and read-only volumes
131 func testCompareSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
132         v := factory(t)
133         defer v.Teardown()
134
135         v.PutRaw(testHash, testData)
136
137         // Compare the block locator with same content
138         err := v.Compare(testHash, testData)
139         if err != nil {
140                 t.Errorf("Got err %q, expected nil", err)
141         }
142 }
143
144 // Test behavior of Compare() when stored data matches expected
145 // checksum but differs from new data we need to store. Requires
146 // testHash = md5(testDataA).
147 //
148 // Test should pass for both writable and read-only volumes
149 func testCompareWithCollision(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
150         v := factory(t)
151         defer v.Teardown()
152
153         v.PutRaw(testHash, testDataA)
154
155         // Compare the block locator with different content; collision
156         err := v.Compare(TestHash, testDataB)
157         if err == nil {
158                 t.Errorf("Got err nil, expected error due to collision")
159         }
160 }
161
162 // Test behavior of Compare() when stored data has become
163 // corrupted. Requires testHash = md5(testDataA) != md5(testDataB).
164 //
165 // Test should pass for both writable and read-only volumes
166 func testCompareWithCorruptStoredData(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
167         v := factory(t)
168         defer v.Teardown()
169
170         v.PutRaw(TestHash, testDataB)
171
172         err := v.Compare(testHash, testDataA)
173         if err == nil || err == CollisionError {
174                 t.Errorf("Got err %+v, expected non-collision error", err)
175         }
176 }
177
178 // Put a block and put again with same content
179 // Test is intended for only writable volumes
180 func testPutBlockWithSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
181         v := factory(t)
182         defer v.Teardown()
183
184         if v.Writable() == false {
185                 return
186         }
187
188         err := v.Put(testHash, testData)
189         if err != nil {
190                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
191         }
192
193         err = v.Put(testHash, testData)
194         if err != nil {
195                 t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
196         }
197 }
198
199 // Put a block and put again with different content
200 // Test is intended for only writable volumes
201 func testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
202         v := factory(t)
203         defer v.Teardown()
204
205         if v.Writable() == false {
206                 return
207         }
208
209         v.PutRaw(testHash, testDataA)
210
211         putErr := v.Put(testHash, testDataB)
212         buf := make([]byte, BlockSize)
213         n, getErr := v.Get(testHash, buf)
214         if putErr == nil {
215                 // Put must not return a nil error unless it has
216                 // overwritten the existing data.
217                 if bytes.Compare(buf[:n], testDataB) != 0 {
218                         t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf[:n], testDataB)
219                 }
220         } else {
221                 // It is permissible for Put to fail, but it must
222                 // leave us with either the original data, the new
223                 // data, or nothing at all.
224                 if getErr == nil && bytes.Compare(buf[:n], testDataA) != 0 && bytes.Compare(buf[:n], testDataB) != 0 {
225                         t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf[:n], testDataA, testDataB)
226                 }
227         }
228 }
229
230 // Put and get multiple blocks
231 // Test is intended for only writable volumes
232 func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
233         v := factory(t)
234         defer v.Teardown()
235
236         if v.Writable() == false {
237                 return
238         }
239
240         err := v.Put(TestHash, TestBlock)
241         if err != nil {
242                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
243         }
244
245         err = v.Put(TestHash2, TestBlock2)
246         if err != nil {
247                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
248         }
249
250         err = v.Put(TestHash3, TestBlock3)
251         if err != nil {
252                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
253         }
254
255         data := make([]byte, BlockSize)
256         n, err := v.Get(TestHash, data)
257         if err != nil {
258                 t.Error(err)
259         } else {
260                 if bytes.Compare(data[:n], TestBlock) != 0 {
261                         t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock)
262                 }
263         }
264
265         n, err = v.Get(TestHash2, data)
266         if err != nil {
267                 t.Error(err)
268         } else {
269                 if bytes.Compare(data[:n], TestBlock2) != 0 {
270                         t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock2)
271                 }
272         }
273
274         n, err = v.Get(TestHash3, data)
275         if err != nil {
276                 t.Error(err)
277         } else {
278                 if bytes.Compare(data[:n], TestBlock3) != 0 {
279                         t.Errorf("Block present, but to %+q, expected %+q", data[:n], TestBlock3)
280                 }
281         }
282 }
283
284 // testPutAndTouch
285 //   Test that when applying PUT to a block that already exists,
286 //   the block's modification time is updated.
287 // Test is intended for only writable volumes
288 func testPutAndTouch(t TB, factory TestableVolumeFactory) {
289         v := factory(t)
290         defer v.Teardown()
291
292         if v.Writable() == false {
293                 return
294         }
295
296         if err := v.Put(TestHash, TestBlock); err != nil {
297                 t.Error(err)
298         }
299
300         // We'll verify { t0 < threshold < t1 }, where t0 is the
301         // existing block's timestamp on disk before Put() and t1 is
302         // its timestamp after Put().
303         threshold := time.Now().Add(-time.Second)
304
305         // Set the stored block's mtime far enough in the past that we
306         // can see the difference between "timestamp didn't change"
307         // and "timestamp granularity is too low".
308         v.TouchWithDate(TestHash, time.Now().Add(-20*time.Second))
309
310         // Make sure v.Mtime() agrees the above Utime really worked.
311         if t0, err := v.Mtime(TestHash); err != nil || t0.IsZero() || !t0.Before(threshold) {
312                 t.Errorf("Setting mtime failed: %v, %v", t0, err)
313         }
314
315         // Write the same block again.
316         if err := v.Put(TestHash, TestBlock); err != nil {
317                 t.Error(err)
318         }
319
320         // Verify threshold < t1
321         if t1, err := v.Mtime(TestHash); err != nil {
322                 t.Error(err)
323         } else if t1.Before(threshold) {
324                 t.Errorf("t1 %v should be >= threshold %v after v.Put ", t1, threshold)
325         }
326 }
327
328 // Touching a non-existing block should result in error.
329 // Test should pass for both writable and read-only volumes
330 func testTouchNoSuchBlock(t TB, factory TestableVolumeFactory) {
331         v := factory(t)
332         defer v.Teardown()
333
334         if err := v.Touch(TestHash); err == nil {
335                 t.Error("Expected error when attempted to touch a non-existing block")
336         }
337 }
338
339 // Invoking Mtime on a non-existing block should result in error.
340 // Test should pass for both writable and read-only volumes
341 func testMtimeNoSuchBlock(t TB, factory TestableVolumeFactory) {
342         v := factory(t)
343         defer v.Teardown()
344
345         if _, err := v.Mtime("12345678901234567890123456789012"); err == nil {
346                 t.Error("Expected error when updating Mtime on a non-existing block")
347         }
348 }
349
350 // Put a few blocks and invoke IndexTo with:
351 // * no prefix
352 // * with a prefix
353 // * with no such prefix
354 // Test should pass for both writable and read-only volumes
355 func testIndexTo(t TB, factory TestableVolumeFactory) {
356         v := factory(t)
357         defer v.Teardown()
358
359         // minMtime and maxMtime are the minimum and maximum
360         // acceptable values the index can report for our test
361         // blocks. 1-second precision is acceptable.
362         minMtime := time.Now().UTC().UnixNano()
363         minMtime -= minMtime % 1e9
364
365         v.PutRaw(TestHash, TestBlock)
366         v.PutRaw(TestHash2, TestBlock2)
367         v.PutRaw(TestHash3, TestBlock3)
368
369         maxMtime := time.Now().UTC().UnixNano()
370         if maxMtime%1e9 > 0 {
371                 maxMtime -= maxMtime % 1e9
372                 maxMtime += 1e9
373         }
374
375         // Blocks whose names aren't Keep hashes should be omitted from
376         // index
377         v.PutRaw("fffffffffnotreallyahashfffffffff", nil)
378         v.PutRaw("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", nil)
379         v.PutRaw("f0000000000000000000000000000000f", nil)
380         v.PutRaw("f00", nil)
381
382         buf := new(bytes.Buffer)
383         v.IndexTo("", buf)
384         indexRows := strings.Split(string(buf.Bytes()), "\n")
385         sort.Strings(indexRows)
386         sortedIndex := strings.Join(indexRows, "\n")
387         m := regexp.MustCompile(
388                 `^\n` + TestHash + `\+\d+ (\d+)\n` +
389                         TestHash3 + `\+\d+ \d+\n` +
390                         TestHash2 + `\+\d+ \d+$`,
391         ).FindStringSubmatch(sortedIndex)
392         if m == nil {
393                 t.Errorf("Got index %q for empty prefix", sortedIndex)
394         } else {
395                 mtime, err := strconv.ParseInt(m[1], 10, 64)
396                 if err != nil {
397                         t.Error(err)
398                 } else if mtime < minMtime || mtime > maxMtime {
399                         t.Errorf("got %d for TestHash timestamp, expected %d <= t <= %d",
400                                 mtime, minMtime, maxMtime)
401                 }
402         }
403
404         for _, prefix := range []string{"f", "f15", "f15ac"} {
405                 buf = new(bytes.Buffer)
406                 v.IndexTo(prefix, buf)
407
408                 m, err := regexp.MatchString(`^`+TestHash2+`\+\d+ \d+\n$`, string(buf.Bytes()))
409                 if err != nil {
410                         t.Error(err)
411                 } else if !m {
412                         t.Errorf("Got index %q for prefix %s", string(buf.Bytes()), prefix)
413                 }
414         }
415
416         for _, prefix := range []string{"zero", "zip", "zilch"} {
417                 buf = new(bytes.Buffer)
418                 err := v.IndexTo(prefix, buf)
419                 if err != nil {
420                         t.Errorf("Got error on IndexTo with no such prefix %v", err.Error())
421                 } else if buf.Len() != 0 {
422                         t.Errorf("Expected empty list for IndexTo with no such prefix %s", prefix)
423                 }
424         }
425 }
426
427 // Calling Delete() for a block immediately after writing it (not old enough)
428 // should neither delete the data nor return an error.
429 // Test is intended for only writable volumes
430 func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
431         v := factory(t)
432         defer v.Teardown()
433         blobSignatureTTL = 300 * time.Second
434
435         if v.Writable() == false {
436                 return
437         }
438
439         v.Put(TestHash, TestBlock)
440
441         if err := v.Trash(TestHash); err != nil {
442                 t.Error(err)
443         }
444         data := make([]byte, BlockSize)
445         n, err := v.Get(TestHash, data)
446         if err != nil {
447                 t.Error(err)
448         } else if bytes.Compare(data[:n], TestBlock) != 0 {
449                 t.Errorf("Got data %+q, expected %+q", data[:n], TestBlock)
450         }
451 }
452
453 // Calling Delete() for a block with a timestamp older than
454 // blobSignatureTTL seconds in the past should delete the data.
455 // Test is intended for only writable volumes
456 func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
457         v := factory(t)
458         defer v.Teardown()
459         blobSignatureTTL = 300 * time.Second
460
461         if v.Writable() == false {
462                 return
463         }
464
465         v.Put(TestHash, TestBlock)
466         v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
467
468         if err := v.Trash(TestHash); err != nil {
469                 t.Error(err)
470         }
471         data := make([]byte, BlockSize)
472         if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) {
473                 t.Errorf("os.IsNotExist(%v) should have been true", err)
474         }
475
476         _, err := v.Mtime(TestHash)
477         if err == nil || !os.IsNotExist(err) {
478                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
479         }
480
481         err = v.Compare(TestHash, TestBlock)
482         if err == nil || !os.IsNotExist(err) {
483                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
484         }
485
486         indexBuf := new(bytes.Buffer)
487         v.IndexTo("", indexBuf)
488         if strings.Contains(string(indexBuf.Bytes()), TestHash) {
489                 t.Fatalf("Found trashed block in IndexTo")
490         }
491
492         err = v.Touch(TestHash)
493         if err == nil || !os.IsNotExist(err) {
494                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
495         }
496 }
497
498 // Calling Delete() for a block that does not exist should result in error.
499 // Test should pass for both writable and read-only volumes
500 func testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
501         v := factory(t)
502         defer v.Teardown()
503
504         if err := v.Trash(TestHash2); err == nil {
505                 t.Errorf("Expected error when attempting to delete a non-existing block")
506         }
507 }
508
509 // Invoke Status and verify that VolumeStatus is returned
510 // Test should pass for both writable and read-only volumes
511 func testStatus(t TB, factory TestableVolumeFactory) {
512         v := factory(t)
513         defer v.Teardown()
514
515         // Get node status and make a basic sanity check.
516         status := v.Status()
517         if status.DeviceNum == 0 {
518                 t.Errorf("uninitialized device_num in %v", status)
519         }
520
521         if status.BytesFree == 0 {
522                 t.Errorf("uninitialized bytes_free in %v", status)
523         }
524
525         if status.BytesUsed == 0 {
526                 t.Errorf("uninitialized bytes_used in %v", status)
527         }
528 }
529
530 // Invoke String for the volume; expect non-empty result
531 // Test should pass for both writable and read-only volumes
532 func testString(t TB, factory TestableVolumeFactory) {
533         v := factory(t)
534         defer v.Teardown()
535
536         if id := v.String(); len(id) == 0 {
537                 t.Error("Got empty string for v.String()")
538         }
539 }
540
541 // Putting, updating, touching, and deleting blocks from a read-only volume result in error.
542 // Test is intended for only read-only volumes
543 func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
544         v := factory(t)
545         defer v.Teardown()
546
547         if v.Writable() == true {
548                 return
549         }
550
551         v.PutRaw(TestHash, TestBlock)
552         buf := make([]byte, BlockSize)
553
554         // Get from read-only volume should succeed
555         _, err := v.Get(TestHash, buf)
556         if err != nil {
557                 t.Errorf("got err %v, expected nil", err)
558         }
559
560         // Put a new block to read-only volume should result in error
561         err = v.Put(TestHash2, TestBlock2)
562         if err == nil {
563                 t.Errorf("Expected error when putting block in a read-only volume")
564         }
565         _, err = v.Get(TestHash2, buf)
566         if err == nil {
567                 t.Errorf("Expected error when getting block whose put in read-only volume failed")
568         }
569
570         // Touch a block in read-only volume should result in error
571         err = v.Touch(TestHash)
572         if err == nil {
573                 t.Errorf("Expected error when touching block in a read-only volume")
574         }
575
576         // Delete a block from a read-only volume should result in error
577         err = v.Trash(TestHash)
578         if err == nil {
579                 t.Errorf("Expected error when deleting block from a read-only volume")
580         }
581
582         // Overwriting an existing block in read-only volume should result in error
583         err = v.Put(TestHash, TestBlock)
584         if err == nil {
585                 t.Errorf("Expected error when putting block in a read-only volume")
586         }
587 }
588
589 // Launch concurrent Gets
590 // Test should pass for both writable and read-only volumes
591 func testGetConcurrent(t TB, factory TestableVolumeFactory) {
592         v := factory(t)
593         defer v.Teardown()
594
595         v.PutRaw(TestHash, TestBlock)
596         v.PutRaw(TestHash2, TestBlock2)
597         v.PutRaw(TestHash3, TestBlock3)
598
599         sem := make(chan int)
600         go func() {
601                 buf := make([]byte, BlockSize)
602                 n, err := v.Get(TestHash, buf)
603                 if err != nil {
604                         t.Errorf("err1: %v", err)
605                 }
606                 if bytes.Compare(buf[:n], TestBlock) != 0 {
607                         t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf[:n]))
608                 }
609                 sem <- 1
610         }()
611
612         go func() {
613                 buf := make([]byte, BlockSize)
614                 n, err := v.Get(TestHash2, buf)
615                 if err != nil {
616                         t.Errorf("err2: %v", err)
617                 }
618                 if bytes.Compare(buf[:n], TestBlock2) != 0 {
619                         t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf[:n]))
620                 }
621                 sem <- 1
622         }()
623
624         go func() {
625                 buf := make([]byte, BlockSize)
626                 n, err := v.Get(TestHash3, buf)
627                 if err != nil {
628                         t.Errorf("err3: %v", err)
629                 }
630                 if bytes.Compare(buf[:n], TestBlock3) != 0 {
631                         t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf[:n]))
632                 }
633                 sem <- 1
634         }()
635
636         // Wait for all goroutines to finish
637         for done := 0; done < 3; done++ {
638                 <-sem
639         }
640 }
641
642 // Launch concurrent Puts
643 // Test is intended for only writable volumes
644 func testPutConcurrent(t TB, factory TestableVolumeFactory) {
645         v := factory(t)
646         defer v.Teardown()
647
648         if v.Writable() == false {
649                 return
650         }
651
652         sem := make(chan int)
653         go func(sem chan int) {
654                 err := v.Put(TestHash, TestBlock)
655                 if err != nil {
656                         t.Errorf("err1: %v", err)
657                 }
658                 sem <- 1
659         }(sem)
660
661         go func(sem chan int) {
662                 err := v.Put(TestHash2, TestBlock2)
663                 if err != nil {
664                         t.Errorf("err2: %v", err)
665                 }
666                 sem <- 1
667         }(sem)
668
669         go func(sem chan int) {
670                 err := v.Put(TestHash3, TestBlock3)
671                 if err != nil {
672                         t.Errorf("err3: %v", err)
673                 }
674                 sem <- 1
675         }(sem)
676
677         // Wait for all goroutines to finish
678         for done := 0; done < 3; done++ {
679                 <-sem
680         }
681
682         // Double check that we actually wrote the blocks we expected to write.
683         buf := make([]byte, BlockSize)
684         n, err := v.Get(TestHash, buf)
685         if err != nil {
686                 t.Errorf("Get #1: %v", err)
687         }
688         if bytes.Compare(buf[:n], TestBlock) != 0 {
689                 t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
690         }
691
692         n, err = v.Get(TestHash2, buf)
693         if err != nil {
694                 t.Errorf("Get #2: %v", err)
695         }
696         if bytes.Compare(buf[:n], TestBlock2) != 0 {
697                 t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
698         }
699
700         n, err = v.Get(TestHash3, buf)
701         if err != nil {
702                 t.Errorf("Get #3: %v", err)
703         }
704         if bytes.Compare(buf[:n], TestBlock3) != 0 {
705                 t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf[:n]))
706         }
707 }
708
709 // Write and read back a full size block
710 func testPutFullBlock(t TB, factory TestableVolumeFactory) {
711         v := factory(t)
712         defer v.Teardown()
713
714         if !v.Writable() {
715                 return
716         }
717
718         wdata := make([]byte, BlockSize)
719         wdata[0] = 'a'
720         wdata[BlockSize-1] = 'z'
721         hash := fmt.Sprintf("%x", md5.Sum(wdata))
722         err := v.Put(hash, wdata)
723         if err != nil {
724                 t.Fatal(err)
725         }
726         buf := make([]byte, BlockSize)
727         n, err := v.Get(hash, buf)
728         if err != nil {
729                 t.Error(err)
730         }
731         if bytes.Compare(buf[:n], wdata) != 0 {
732                 t.Error("buf %+q != wdata %+q", buf[:n], wdata)
733         }
734 }
735
736 // With trashLifetime != 0, perform:
737 // Trash an old block - which either raises ErrNotImplemented or succeeds
738 // Untrash -  which either raises ErrNotImplemented or succeeds
739 // Get - which must succeed
740 func testTrashUntrash(t TB, factory TestableVolumeFactory) {
741         v := factory(t)
742         defer v.Teardown()
743         defer func() {
744                 trashLifetime = 0
745         }()
746
747         trashLifetime = 3600 * time.Second
748
749         // put block and backdate it
750         v.PutRaw(TestHash, TestBlock)
751         v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
752
753         buf := make([]byte, BlockSize)
754         n, err := v.Get(TestHash, buf)
755         if err != nil {
756                 t.Fatal(err)
757         }
758         if bytes.Compare(buf[:n], TestBlock) != 0 {
759                 t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
760         }
761
762         // Trash
763         err = v.Trash(TestHash)
764         if v.Writable() == false {
765                 if err != MethodDisabledError {
766                         t.Fatal(err)
767                 }
768         } else if err != nil {
769                 if err != ErrNotImplemented {
770                         t.Fatal(err)
771                 }
772         } else {
773                 _, err = v.Get(TestHash, buf)
774                 if err == nil || !os.IsNotExist(err) {
775                         t.Errorf("os.IsNotExist(%v) should have been true", err)
776                 }
777
778                 // Untrash
779                 err = v.Untrash(TestHash)
780                 if err != nil {
781                         t.Fatal(err)
782                 }
783         }
784
785         // Get the block - after trash and untrash sequence
786         n, err = v.Get(TestHash, buf)
787         if err != nil {
788                 t.Fatal(err)
789         }
790         if bytes.Compare(buf[:n], TestBlock) != 0 {
791                 t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
792         }
793 }
794
795 func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
796         v := factory(t)
797         defer v.Teardown()
798         defer func(orig time.Duration) {
799                 trashLifetime = orig
800         }(trashLifetime)
801
802         checkGet := func() error {
803                 buf := make([]byte, BlockSize)
804                 n, err := v.Get(TestHash, buf)
805                 if err != nil {
806                         return err
807                 }
808                 if bytes.Compare(buf[:n], TestBlock) != 0 {
809                         t.Fatalf("Got data %+q, expected %+q", buf[:n], TestBlock)
810                 }
811
812                 _, err = v.Mtime(TestHash)
813                 if err != nil {
814                         return err
815                 }
816
817                 err = v.Compare(TestHash, TestBlock)
818                 if err != nil {
819                         return err
820                 }
821
822                 indexBuf := new(bytes.Buffer)
823                 v.IndexTo("", indexBuf)
824                 if !strings.Contains(string(indexBuf.Bytes()), TestHash) {
825                         return os.ErrNotExist
826                 }
827
828                 return nil
829         }
830
831         // First set: EmptyTrash before reaching the trash deadline.
832
833         trashLifetime = time.Hour
834
835         v.PutRaw(TestHash, TestBlock)
836         v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
837
838         err := checkGet()
839         if err != nil {
840                 t.Fatal(err)
841         }
842
843         // Trash the block
844         err = v.Trash(TestHash)
845         if err == MethodDisabledError || err == ErrNotImplemented {
846                 // Skip the trash tests for read-only volumes, and
847                 // volume types that don't support trashLifetime>0.
848                 return
849         }
850
851         err = checkGet()
852         if err == nil || !os.IsNotExist(err) {
853                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
854         }
855
856         err = v.Touch(TestHash)
857         if err == nil || !os.IsNotExist(err) {
858                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
859         }
860
861         v.EmptyTrash()
862
863         // Even after emptying the trash, we can untrash our block
864         // because the deadline hasn't been reached.
865         err = v.Untrash(TestHash)
866         if err != nil {
867                 t.Fatal(err)
868         }
869
870         err = checkGet()
871         if err != nil {
872                 t.Fatal(err)
873         }
874
875         err = v.Touch(TestHash)
876         if err != nil {
877                 t.Fatal(err)
878         }
879
880         // Because we Touch'ed, need to backdate again for next set of tests
881         v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
882
883         // If the only block in the trash has already been untrashed,
884         // most volumes will fail a subsequent Untrash with a 404, but
885         // it's also acceptable for Untrash to succeed.
886         err = v.Untrash(TestHash)
887         if err != nil && !os.IsNotExist(err) {
888                 t.Fatalf("Expected success or os.IsNotExist(), but got: %v", err)
889         }
890
891         // The additional Untrash should not interfere with our
892         // already-untrashed copy.
893         err = checkGet()
894         if err != nil {
895                 t.Fatal(err)
896         }
897
898         // Untrash might have updated the timestamp, so backdate again
899         v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
900
901         // Second set: EmptyTrash after the trash deadline has passed.
902
903         trashLifetime = time.Nanosecond
904
905         err = v.Trash(TestHash)
906         if err != nil {
907                 t.Fatal(err)
908         }
909         err = checkGet()
910         if err == nil || !os.IsNotExist(err) {
911                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
912         }
913
914         // Even though 1ns has passed, we can untrash because we
915         // haven't called EmptyTrash yet.
916         err = v.Untrash(TestHash)
917         if err != nil {
918                 t.Fatal(err)
919         }
920         err = checkGet()
921         if err != nil {
922                 t.Fatal(err)
923         }
924
925         // Trash it again, and this time call EmptyTrash so it really
926         // goes away.
927         // (In Azure volumes, un/trash changes Mtime, so first backdate again)
928         v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
929         err = v.Trash(TestHash)
930         err = checkGet()
931         if err == nil || !os.IsNotExist(err) {
932                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
933         }
934         v.EmptyTrash()
935
936         // Untrash won't find it
937         err = v.Untrash(TestHash)
938         if err == nil || !os.IsNotExist(err) {
939                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
940         }
941
942         // Get block won't find it
943         err = checkGet()
944         if err == nil || !os.IsNotExist(err) {
945                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
946         }
947
948         // Third set: If the same data block gets written again after
949         // being trashed, and then the trash gets emptied, the newer
950         // un-trashed copy doesn't get deleted along with it.
951
952         v.PutRaw(TestHash, TestBlock)
953         v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
954
955         trashLifetime = time.Nanosecond
956         err = v.Trash(TestHash)
957         if err != nil {
958                 t.Fatal(err)
959         }
960         err = checkGet()
961         if err == nil || !os.IsNotExist(err) {
962                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
963         }
964
965         v.PutRaw(TestHash, TestBlock)
966         v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
967
968         // EmptyTrash should not delete the untrashed copy.
969         v.EmptyTrash()
970         err = checkGet()
971         if err != nil {
972                 t.Fatal(err)
973         }
974
975         // Fourth set: If the same data block gets trashed twice with
976         // different deadlines A and C, and then the trash is emptied
977         // at intermediate time B (A < B < C), it is still possible to
978         // untrash the block whose deadline is "C".
979
980         v.PutRaw(TestHash, TestBlock)
981         v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
982
983         trashLifetime = time.Nanosecond
984         err = v.Trash(TestHash)
985         if err != nil {
986                 t.Fatal(err)
987         }
988
989         v.PutRaw(TestHash, TestBlock)
990         v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
991
992         trashLifetime = time.Hour
993         err = v.Trash(TestHash)
994         if err != nil {
995                 t.Fatal(err)
996         }
997
998         // EmptyTrash should not prevent us from recovering the
999         // time.Hour ("C") trash
1000         v.EmptyTrash()
1001         err = v.Untrash(TestHash)
1002         if err != nil {
1003                 t.Fatal(err)
1004         }
1005         err = checkGet()
1006         if err != nil {
1007                 t.Fatal(err)
1008         }
1009 }