Merge branch '9897-log-block-prefetch-worker-exceptions' closes #9897
[arvados.git] / services / keepstore / volume_generic_test.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "crypto/md5"
7         "fmt"
8         "os"
9         "regexp"
10         "sort"
11         "strconv"
12         "strings"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
17 )
18
19 type TB interface {
20         Error(args ...interface{})
21         Errorf(format string, args ...interface{})
22         Fail()
23         FailNow()
24         Failed() bool
25         Fatal(args ...interface{})
26         Fatalf(format string, args ...interface{})
27         Log(args ...interface{})
28         Logf(format string, args ...interface{})
29 }
30
31 // A TestableVolumeFactory returns a new TestableVolume. The factory
32 // function, and the TestableVolume it returns, can use "t" to write
33 // logs, fail the current test, etc.
34 type TestableVolumeFactory func(t TB) TestableVolume
35
36 // DoGenericVolumeTests runs a set of tests that every TestableVolume
37 // is expected to pass. It calls factory to create a new TestableVolume
38 // for each test case, to avoid leaking state between tests.
39 func DoGenericVolumeTests(t TB, factory TestableVolumeFactory) {
40         testGet(t, factory)
41         testGetNoSuchBlock(t, factory)
42
43         testCompareNonexistent(t, factory)
44         testCompareSameContent(t, factory, TestHash, TestBlock)
45         testCompareSameContent(t, factory, EmptyHash, EmptyBlock)
46         testCompareWithCollision(t, factory, TestHash, TestBlock, []byte("baddata"))
47         testCompareWithCollision(t, factory, TestHash, TestBlock, EmptyBlock)
48         testCompareWithCollision(t, factory, EmptyHash, EmptyBlock, TestBlock)
49         testCompareWithCorruptStoredData(t, factory, TestHash, TestBlock, []byte("baddata"))
50         testCompareWithCorruptStoredData(t, factory, TestHash, TestBlock, EmptyBlock)
51         testCompareWithCorruptStoredData(t, factory, EmptyHash, EmptyBlock, []byte("baddata"))
52
53         testPutBlockWithSameContent(t, factory, TestHash, TestBlock)
54         testPutBlockWithSameContent(t, factory, EmptyHash, EmptyBlock)
55         testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], arvadostest.MD5CollisionData[1])
56         testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, EmptyBlock, arvadostest.MD5CollisionData[0])
57         testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], EmptyBlock)
58         testPutBlockWithDifferentContent(t, factory, EmptyHash, EmptyBlock, arvadostest.MD5CollisionData[0])
59         testPutMultipleBlocks(t, factory)
60
61         testPutAndTouch(t, factory)
62         testTouchNoSuchBlock(t, factory)
63
64         testMtimeNoSuchBlock(t, factory)
65
66         testIndexTo(t, factory)
67
68         testDeleteNewBlock(t, factory)
69         testDeleteOldBlock(t, factory)
70         testDeleteNoSuchBlock(t, factory)
71
72         testStatus(t, factory)
73
74         testString(t, factory)
75
76         testUpdateReadOnly(t, factory)
77
78         testGetConcurrent(t, factory)
79         testPutConcurrent(t, factory)
80
81         testPutFullBlock(t, factory)
82
83         testTrashUntrash(t, factory)
84         testTrashEmptyTrashUntrash(t, factory)
85 }
86
87 // Put a test block, get it and verify content
88 // Test should pass for both writable and read-only volumes
89 func testGet(t TB, factory TestableVolumeFactory) {
90         v := factory(t)
91         defer v.Teardown()
92
93         v.PutRaw(TestHash, TestBlock)
94
95         buf := make([]byte, BlockSize)
96         n, err := v.Get(context.Background(), TestHash, buf)
97         if err != nil {
98                 t.Fatal(err)
99         }
100
101         if bytes.Compare(buf[:n], TestBlock) != 0 {
102                 t.Errorf("expected %s, got %s", string(TestBlock), string(buf))
103         }
104 }
105
106 // Invoke get on a block that does not exist in volume; should result in error
107 // Test should pass for both writable and read-only volumes
108 func testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
109         v := factory(t)
110         defer v.Teardown()
111
112         buf := make([]byte, BlockSize)
113         if _, err := v.Get(context.Background(), TestHash2, buf); err == nil {
114                 t.Errorf("Expected error while getting non-existing block %v", TestHash2)
115         }
116 }
117
118 // Compare() should return os.ErrNotExist if the block does not exist.
119 // Otherwise, writing new data causes CompareAndTouch() to generate
120 // error logs even though everything is working fine.
121 func testCompareNonexistent(t TB, factory TestableVolumeFactory) {
122         v := factory(t)
123         defer v.Teardown()
124
125         err := v.Compare(context.Background(), TestHash, TestBlock)
126         if err != os.ErrNotExist {
127                 t.Errorf("Got err %T %q, expected os.ErrNotExist", err, err)
128         }
129 }
130
131 // Put a test block and compare the locator with same content
132 // Test should pass for both writable and read-only volumes
133 func testCompareSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
134         v := factory(t)
135         defer v.Teardown()
136
137         v.PutRaw(testHash, testData)
138
139         // Compare the block locator with same content
140         err := v.Compare(context.Background(), testHash, testData)
141         if err != nil {
142                 t.Errorf("Got err %q, expected nil", err)
143         }
144 }
145
146 // Test behavior of Compare() when stored data matches expected
147 // checksum but differs from new data we need to store. Requires
148 // testHash = md5(testDataA).
149 //
150 // Test should pass for both writable and read-only volumes
151 func testCompareWithCollision(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
152         v := factory(t)
153         defer v.Teardown()
154
155         v.PutRaw(testHash, testDataA)
156
157         // Compare the block locator with different content; collision
158         err := v.Compare(context.Background(), TestHash, testDataB)
159         if err == nil {
160                 t.Errorf("Got err nil, expected error due to collision")
161         }
162 }
163
164 // Test behavior of Compare() when stored data has become
165 // corrupted. Requires testHash = md5(testDataA) != md5(testDataB).
166 //
167 // Test should pass for both writable and read-only volumes
168 func testCompareWithCorruptStoredData(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
169         v := factory(t)
170         defer v.Teardown()
171
172         v.PutRaw(TestHash, testDataB)
173
174         err := v.Compare(context.Background(), testHash, testDataA)
175         if err == nil || err == CollisionError {
176                 t.Errorf("Got err %+v, expected non-collision error", err)
177         }
178 }
179
180 // Put a block and put again with same content
181 // Test is intended for only writable volumes
182 func testPutBlockWithSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
183         v := factory(t)
184         defer v.Teardown()
185
186         if v.Writable() == false {
187                 return
188         }
189
190         err := v.Put(context.Background(), testHash, testData)
191         if err != nil {
192                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
193         }
194
195         err = v.Put(context.Background(), testHash, testData)
196         if err != nil {
197                 t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
198         }
199 }
200
201 // Put a block and put again with different content
202 // Test is intended for only writable volumes
203 func testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
204         v := factory(t)
205         defer v.Teardown()
206
207         if v.Writable() == false {
208                 return
209         }
210
211         v.PutRaw(testHash, testDataA)
212
213         putErr := v.Put(context.Background(), testHash, testDataB)
214         buf := make([]byte, BlockSize)
215         n, getErr := v.Get(context.Background(), testHash, buf)
216         if putErr == nil {
217                 // Put must not return a nil error unless it has
218                 // overwritten the existing data.
219                 if bytes.Compare(buf[:n], testDataB) != 0 {
220                         t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf[:n], testDataB)
221                 }
222         } else {
223                 // It is permissible for Put to fail, but it must
224                 // leave us with either the original data, the new
225                 // data, or nothing at all.
226                 if getErr == nil && bytes.Compare(buf[:n], testDataA) != 0 && bytes.Compare(buf[:n], testDataB) != 0 {
227                         t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf[:n], testDataA, testDataB)
228                 }
229         }
230 }
231
232 // Put and get multiple blocks
233 // Test is intended for only writable volumes
234 func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
235         v := factory(t)
236         defer v.Teardown()
237
238         if v.Writable() == false {
239                 return
240         }
241
242         err := v.Put(context.Background(), TestHash, TestBlock)
243         if err != nil {
244                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
245         }
246
247         err = v.Put(context.Background(), TestHash2, TestBlock2)
248         if err != nil {
249                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
250         }
251
252         err = v.Put(context.Background(), TestHash3, TestBlock3)
253         if err != nil {
254                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
255         }
256
257         data := make([]byte, BlockSize)
258         n, err := v.Get(context.Background(), TestHash, data)
259         if err != nil {
260                 t.Error(err)
261         } else {
262                 if bytes.Compare(data[:n], TestBlock) != 0 {
263                         t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock)
264                 }
265         }
266
267         n, err = v.Get(context.Background(), TestHash2, data)
268         if err != nil {
269                 t.Error(err)
270         } else {
271                 if bytes.Compare(data[:n], TestBlock2) != 0 {
272                         t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock2)
273                 }
274         }
275
276         n, err = v.Get(context.Background(), TestHash3, data)
277         if err != nil {
278                 t.Error(err)
279         } else {
280                 if bytes.Compare(data[:n], TestBlock3) != 0 {
281                         t.Errorf("Block present, but to %+q, expected %+q", data[:n], TestBlock3)
282                 }
283         }
284 }
285
286 // testPutAndTouch
287 //   Test that when applying PUT to a block that already exists,
288 //   the block's modification time is updated.
289 // Test is intended for only writable volumes
290 func testPutAndTouch(t TB, factory TestableVolumeFactory) {
291         v := factory(t)
292         defer v.Teardown()
293
294         if v.Writable() == false {
295                 return
296         }
297
298         if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
299                 t.Error(err)
300         }
301
302         // We'll verify { t0 < threshold < t1 }, where t0 is the
303         // existing block's timestamp on disk before Put() and t1 is
304         // its timestamp after Put().
305         threshold := time.Now().Add(-time.Second)
306
307         // Set the stored block's mtime far enough in the past that we
308         // can see the difference between "timestamp didn't change"
309         // and "timestamp granularity is too low".
310         v.TouchWithDate(TestHash, time.Now().Add(-20*time.Second))
311
312         // Make sure v.Mtime() agrees the above Utime really worked.
313         if t0, err := v.Mtime(TestHash); err != nil || t0.IsZero() || !t0.Before(threshold) {
314                 t.Errorf("Setting mtime failed: %v, %v", t0, err)
315         }
316
317         // Write the same block again.
318         if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
319                 t.Error(err)
320         }
321
322         // Verify threshold < t1
323         if t1, err := v.Mtime(TestHash); err != nil {
324                 t.Error(err)
325         } else if t1.Before(threshold) {
326                 t.Errorf("t1 %v should be >= threshold %v after v.Put ", t1, threshold)
327         }
328 }
329
330 // Touching a non-existing block should result in error.
331 // Test should pass for both writable and read-only volumes
332 func testTouchNoSuchBlock(t TB, factory TestableVolumeFactory) {
333         v := factory(t)
334         defer v.Teardown()
335
336         if err := v.Touch(TestHash); err == nil {
337                 t.Error("Expected error when attempted to touch a non-existing block")
338         }
339 }
340
341 // Invoking Mtime on a non-existing block should result in error.
342 // Test should pass for both writable and read-only volumes
343 func testMtimeNoSuchBlock(t TB, factory TestableVolumeFactory) {
344         v := factory(t)
345         defer v.Teardown()
346
347         if _, err := v.Mtime("12345678901234567890123456789012"); err == nil {
348                 t.Error("Expected error when updating Mtime on a non-existing block")
349         }
350 }
351
352 // Put a few blocks and invoke IndexTo with:
353 // * no prefix
354 // * with a prefix
355 // * with no such prefix
356 // Test should pass for both writable and read-only volumes
357 func testIndexTo(t TB, factory TestableVolumeFactory) {
358         v := factory(t)
359         defer v.Teardown()
360
361         // minMtime and maxMtime are the minimum and maximum
362         // acceptable values the index can report for our test
363         // blocks. 1-second precision is acceptable.
364         minMtime := time.Now().UTC().UnixNano()
365         minMtime -= minMtime % 1e9
366
367         v.PutRaw(TestHash, TestBlock)
368         v.PutRaw(TestHash2, TestBlock2)
369         v.PutRaw(TestHash3, TestBlock3)
370
371         maxMtime := time.Now().UTC().UnixNano()
372         if maxMtime%1e9 > 0 {
373                 maxMtime -= maxMtime % 1e9
374                 maxMtime += 1e9
375         }
376
377         // Blocks whose names aren't Keep hashes should be omitted from
378         // index
379         v.PutRaw("fffffffffnotreallyahashfffffffff", nil)
380         v.PutRaw("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", nil)
381         v.PutRaw("f0000000000000000000000000000000f", nil)
382         v.PutRaw("f00", nil)
383
384         buf := new(bytes.Buffer)
385         v.IndexTo("", buf)
386         indexRows := strings.Split(string(buf.Bytes()), "\n")
387         sort.Strings(indexRows)
388         sortedIndex := strings.Join(indexRows, "\n")
389         m := regexp.MustCompile(
390                 `^\n` + TestHash + `\+\d+ (\d+)\n` +
391                         TestHash3 + `\+\d+ \d+\n` +
392                         TestHash2 + `\+\d+ \d+$`,
393         ).FindStringSubmatch(sortedIndex)
394         if m == nil {
395                 t.Errorf("Got index %q for empty prefix", sortedIndex)
396         } else {
397                 mtime, err := strconv.ParseInt(m[1], 10, 64)
398                 if err != nil {
399                         t.Error(err)
400                 } else if mtime < minMtime || mtime > maxMtime {
401                         t.Errorf("got %d for TestHash timestamp, expected %d <= t <= %d",
402                                 mtime, minMtime, maxMtime)
403                 }
404         }
405
406         for _, prefix := range []string{"f", "f15", "f15ac"} {
407                 buf = new(bytes.Buffer)
408                 v.IndexTo(prefix, buf)
409
410                 m, err := regexp.MatchString(`^`+TestHash2+`\+\d+ \d+\n$`, string(buf.Bytes()))
411                 if err != nil {
412                         t.Error(err)
413                 } else if !m {
414                         t.Errorf("Got index %q for prefix %s", string(buf.Bytes()), prefix)
415                 }
416         }
417
418         for _, prefix := range []string{"zero", "zip", "zilch"} {
419                 buf = new(bytes.Buffer)
420                 err := v.IndexTo(prefix, buf)
421                 if err != nil {
422                         t.Errorf("Got error on IndexTo with no such prefix %v", err.Error())
423                 } else if buf.Len() != 0 {
424                         t.Errorf("Expected empty list for IndexTo with no such prefix %s", prefix)
425                 }
426         }
427 }
428
429 // Calling Delete() for a block immediately after writing it (not old enough)
430 // should neither delete the data nor return an error.
431 // Test is intended for only writable volumes
432 func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
433         v := factory(t)
434         defer v.Teardown()
435         theConfig.BlobSignatureTTL.Set("5m")
436
437         if v.Writable() == false {
438                 return
439         }
440
441         v.Put(context.Background(), TestHash, TestBlock)
442
443         if err := v.Trash(TestHash); err != nil {
444                 t.Error(err)
445         }
446         data := make([]byte, BlockSize)
447         n, err := v.Get(context.Background(), TestHash, data)
448         if err != nil {
449                 t.Error(err)
450         } else if bytes.Compare(data[:n], TestBlock) != 0 {
451                 t.Errorf("Got data %+q, expected %+q", data[:n], TestBlock)
452         }
453 }
454
455 // Calling Delete() for a block with a timestamp older than
456 // BlobSignatureTTL seconds in the past should delete the data.
457 // Test is intended for only writable volumes
458 func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
459         v := factory(t)
460         defer v.Teardown()
461         theConfig.BlobSignatureTTL.Set("5m")
462
463         if v.Writable() == false {
464                 return
465         }
466
467         v.Put(context.Background(), TestHash, TestBlock)
468         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
469
470         if err := v.Trash(TestHash); err != nil {
471                 t.Error(err)
472         }
473         data := make([]byte, BlockSize)
474         if _, err := v.Get(context.Background(), TestHash, data); err == nil || !os.IsNotExist(err) {
475                 t.Errorf("os.IsNotExist(%v) should have been true", err)
476         }
477
478         _, err := v.Mtime(TestHash)
479         if err == nil || !os.IsNotExist(err) {
480                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
481         }
482
483         err = v.Compare(context.Background(), TestHash, TestBlock)
484         if err == nil || !os.IsNotExist(err) {
485                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
486         }
487
488         indexBuf := new(bytes.Buffer)
489         v.IndexTo("", indexBuf)
490         if strings.Contains(string(indexBuf.Bytes()), TestHash) {
491                 t.Fatalf("Found trashed block in IndexTo")
492         }
493
494         err = v.Touch(TestHash)
495         if err == nil || !os.IsNotExist(err) {
496                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
497         }
498 }
499
500 // Calling Delete() for a block that does not exist should result in error.
501 // Test should pass for both writable and read-only volumes
502 func testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
503         v := factory(t)
504         defer v.Teardown()
505
506         if err := v.Trash(TestHash2); err == nil {
507                 t.Errorf("Expected error when attempting to delete a non-existing block")
508         }
509 }
510
511 // Invoke Status and verify that VolumeStatus is returned
512 // Test should pass for both writable and read-only volumes
513 func testStatus(t TB, factory TestableVolumeFactory) {
514         v := factory(t)
515         defer v.Teardown()
516
517         // Get node status and make a basic sanity check.
518         status := v.Status()
519         if status.DeviceNum == 0 {
520                 t.Errorf("uninitialized device_num in %v", status)
521         }
522
523         if status.BytesFree == 0 {
524                 t.Errorf("uninitialized bytes_free in %v", status)
525         }
526
527         if status.BytesUsed == 0 {
528                 t.Errorf("uninitialized bytes_used in %v", status)
529         }
530 }
531
532 // Invoke String for the volume; expect non-empty result
533 // Test should pass for both writable and read-only volumes
534 func testString(t TB, factory TestableVolumeFactory) {
535         v := factory(t)
536         defer v.Teardown()
537
538         if id := v.String(); len(id) == 0 {
539                 t.Error("Got empty string for v.String()")
540         }
541 }
542
543 // Putting, updating, touching, and deleting blocks from a read-only volume result in error.
544 // Test is intended for only read-only volumes
545 func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
546         v := factory(t)
547         defer v.Teardown()
548
549         if v.Writable() == true {
550                 return
551         }
552
553         v.PutRaw(TestHash, TestBlock)
554         buf := make([]byte, BlockSize)
555
556         // Get from read-only volume should succeed
557         _, err := v.Get(context.Background(), TestHash, buf)
558         if err != nil {
559                 t.Errorf("got err %v, expected nil", err)
560         }
561
562         // Put a new block to read-only volume should result in error
563         err = v.Put(context.Background(), TestHash2, TestBlock2)
564         if err == nil {
565                 t.Errorf("Expected error when putting block in a read-only volume")
566         }
567         _, err = v.Get(context.Background(), TestHash2, buf)
568         if err == nil {
569                 t.Errorf("Expected error when getting block whose put in read-only volume failed")
570         }
571
572         // Touch a block in read-only volume should result in error
573         err = v.Touch(TestHash)
574         if err == nil {
575                 t.Errorf("Expected error when touching block in a read-only volume")
576         }
577
578         // Delete a block from a read-only volume should result in error
579         err = v.Trash(TestHash)
580         if err == nil {
581                 t.Errorf("Expected error when deleting block from a read-only volume")
582         }
583
584         // Overwriting an existing block in read-only volume should result in error
585         err = v.Put(context.Background(), TestHash, TestBlock)
586         if err == nil {
587                 t.Errorf("Expected error when putting block in a read-only volume")
588         }
589 }
590
591 // Launch concurrent Gets
592 // Test should pass for both writable and read-only volumes
593 func testGetConcurrent(t TB, factory TestableVolumeFactory) {
594         v := factory(t)
595         defer v.Teardown()
596
597         v.PutRaw(TestHash, TestBlock)
598         v.PutRaw(TestHash2, TestBlock2)
599         v.PutRaw(TestHash3, TestBlock3)
600
601         sem := make(chan int)
602         go func() {
603                 buf := make([]byte, BlockSize)
604                 n, err := v.Get(context.Background(), TestHash, buf)
605                 if err != nil {
606                         t.Errorf("err1: %v", err)
607                 }
608                 if bytes.Compare(buf[:n], TestBlock) != 0 {
609                         t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf[:n]))
610                 }
611                 sem <- 1
612         }()
613
614         go func() {
615                 buf := make([]byte, BlockSize)
616                 n, err := v.Get(context.Background(), TestHash2, buf)
617                 if err != nil {
618                         t.Errorf("err2: %v", err)
619                 }
620                 if bytes.Compare(buf[:n], TestBlock2) != 0 {
621                         t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf[:n]))
622                 }
623                 sem <- 1
624         }()
625
626         go func() {
627                 buf := make([]byte, BlockSize)
628                 n, err := v.Get(context.Background(), TestHash3, buf)
629                 if err != nil {
630                         t.Errorf("err3: %v", err)
631                 }
632                 if bytes.Compare(buf[:n], TestBlock3) != 0 {
633                         t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf[:n]))
634                 }
635                 sem <- 1
636         }()
637
638         // Wait for all goroutines to finish
639         for done := 0; done < 3; done++ {
640                 <-sem
641         }
642 }
643
644 // Launch concurrent Puts
645 // Test is intended for only writable volumes
646 func testPutConcurrent(t TB, factory TestableVolumeFactory) {
647         v := factory(t)
648         defer v.Teardown()
649
650         if v.Writable() == false {
651                 return
652         }
653
654         sem := make(chan int)
655         go func(sem chan int) {
656                 err := v.Put(context.Background(), TestHash, TestBlock)
657                 if err != nil {
658                         t.Errorf("err1: %v", err)
659                 }
660                 sem <- 1
661         }(sem)
662
663         go func(sem chan int) {
664                 err := v.Put(context.Background(), TestHash2, TestBlock2)
665                 if err != nil {
666                         t.Errorf("err2: %v", err)
667                 }
668                 sem <- 1
669         }(sem)
670
671         go func(sem chan int) {
672                 err := v.Put(context.Background(), TestHash3, TestBlock3)
673                 if err != nil {
674                         t.Errorf("err3: %v", err)
675                 }
676                 sem <- 1
677         }(sem)
678
679         // Wait for all goroutines to finish
680         for done := 0; done < 3; done++ {
681                 <-sem
682         }
683
684         // Double check that we actually wrote the blocks we expected to write.
685         buf := make([]byte, BlockSize)
686         n, err := v.Get(context.Background(), TestHash, buf)
687         if err != nil {
688                 t.Errorf("Get #1: %v", err)
689         }
690         if bytes.Compare(buf[:n], TestBlock) != 0 {
691                 t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
692         }
693
694         n, err = v.Get(context.Background(), TestHash2, buf)
695         if err != nil {
696                 t.Errorf("Get #2: %v", err)
697         }
698         if bytes.Compare(buf[:n], TestBlock2) != 0 {
699                 t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
700         }
701
702         n, err = v.Get(context.Background(), TestHash3, buf)
703         if err != nil {
704                 t.Errorf("Get #3: %v", err)
705         }
706         if bytes.Compare(buf[:n], TestBlock3) != 0 {
707                 t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf[:n]))
708         }
709 }
710
711 // Write and read back a full size block
712 func testPutFullBlock(t TB, factory TestableVolumeFactory) {
713         v := factory(t)
714         defer v.Teardown()
715
716         if !v.Writable() {
717                 return
718         }
719
720         wdata := make([]byte, BlockSize)
721         wdata[0] = 'a'
722         wdata[BlockSize-1] = 'z'
723         hash := fmt.Sprintf("%x", md5.Sum(wdata))
724         err := v.Put(context.Background(), hash, wdata)
725         if err != nil {
726                 t.Fatal(err)
727         }
728         buf := make([]byte, BlockSize)
729         n, err := v.Get(context.Background(), hash, buf)
730         if err != nil {
731                 t.Error(err)
732         }
733         if bytes.Compare(buf[:n], wdata) != 0 {
734                 t.Error("buf %+q != wdata %+q", buf[:n], wdata)
735         }
736 }
737
738 // With TrashLifetime != 0, perform:
739 // Trash an old block - which either raises ErrNotImplemented or succeeds
740 // Untrash -  which either raises ErrNotImplemented or succeeds
741 // Get - which must succeed
742 func testTrashUntrash(t TB, factory TestableVolumeFactory) {
743         v := factory(t)
744         defer v.Teardown()
745         defer func() {
746                 theConfig.TrashLifetime = 0
747         }()
748
749         theConfig.TrashLifetime.Set("1h")
750
751         // put block and backdate it
752         v.PutRaw(TestHash, TestBlock)
753         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
754
755         buf := make([]byte, BlockSize)
756         n, err := v.Get(context.Background(), TestHash, buf)
757         if err != nil {
758                 t.Fatal(err)
759         }
760         if bytes.Compare(buf[:n], TestBlock) != 0 {
761                 t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
762         }
763
764         // Trash
765         err = v.Trash(TestHash)
766         if v.Writable() == false {
767                 if err != MethodDisabledError {
768                         t.Fatal(err)
769                 }
770         } else if err != nil {
771                 if err != ErrNotImplemented {
772                         t.Fatal(err)
773                 }
774         } else {
775                 _, err = v.Get(context.Background(), TestHash, buf)
776                 if err == nil || !os.IsNotExist(err) {
777                         t.Errorf("os.IsNotExist(%v) should have been true", err)
778                 }
779
780                 // Untrash
781                 err = v.Untrash(TestHash)
782                 if err != nil {
783                         t.Fatal(err)
784                 }
785         }
786
787         // Get the block - after trash and untrash sequence
788         n, err = v.Get(context.Background(), TestHash, buf)
789         if err != nil {
790                 t.Fatal(err)
791         }
792         if bytes.Compare(buf[:n], TestBlock) != 0 {
793                 t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
794         }
795 }
796
797 func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
798         v := factory(t)
799         defer v.Teardown()
800         defer func(orig arvados.Duration) {
801                 theConfig.TrashLifetime = orig
802         }(theConfig.TrashLifetime)
803
804         checkGet := func() error {
805                 buf := make([]byte, BlockSize)
806                 n, err := v.Get(context.Background(), TestHash, buf)
807                 if err != nil {
808                         return err
809                 }
810                 if bytes.Compare(buf[:n], TestBlock) != 0 {
811                         t.Fatalf("Got data %+q, expected %+q", buf[:n], TestBlock)
812                 }
813
814                 _, err = v.Mtime(TestHash)
815                 if err != nil {
816                         return err
817                 }
818
819                 err = v.Compare(context.Background(), TestHash, TestBlock)
820                 if err != nil {
821                         return err
822                 }
823
824                 indexBuf := new(bytes.Buffer)
825                 v.IndexTo("", indexBuf)
826                 if !strings.Contains(string(indexBuf.Bytes()), TestHash) {
827                         return os.ErrNotExist
828                 }
829
830                 return nil
831         }
832
833         // First set: EmptyTrash before reaching the trash deadline.
834
835         theConfig.TrashLifetime.Set("1h")
836
837         v.PutRaw(TestHash, TestBlock)
838         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
839
840         err := checkGet()
841         if err != nil {
842                 t.Fatal(err)
843         }
844
845         // Trash the block
846         err = v.Trash(TestHash)
847         if err == MethodDisabledError || err == ErrNotImplemented {
848                 // Skip the trash tests for read-only volumes, and
849                 // volume types that don't support TrashLifetime>0.
850                 return
851         }
852
853         err = checkGet()
854         if err == nil || !os.IsNotExist(err) {
855                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
856         }
857
858         err = v.Touch(TestHash)
859         if err == nil || !os.IsNotExist(err) {
860                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
861         }
862
863         v.EmptyTrash()
864
865         // Even after emptying the trash, we can untrash our block
866         // because the deadline hasn't been reached.
867         err = v.Untrash(TestHash)
868         if err != nil {
869                 t.Fatal(err)
870         }
871
872         err = checkGet()
873         if err != nil {
874                 t.Fatal(err)
875         }
876
877         err = v.Touch(TestHash)
878         if err != nil {
879                 t.Fatal(err)
880         }
881
882         // Because we Touch'ed, need to backdate again for next set of tests
883         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
884
885         // If the only block in the trash has already been untrashed,
886         // most volumes will fail a subsequent Untrash with a 404, but
887         // it's also acceptable for Untrash to succeed.
888         err = v.Untrash(TestHash)
889         if err != nil && !os.IsNotExist(err) {
890                 t.Fatalf("Expected success or os.IsNotExist(), but got: %v", err)
891         }
892
893         // The additional Untrash should not interfere with our
894         // already-untrashed copy.
895         err = checkGet()
896         if err != nil {
897                 t.Fatal(err)
898         }
899
900         // Untrash might have updated the timestamp, so backdate again
901         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
902
903         // Second set: EmptyTrash after the trash deadline has passed.
904
905         theConfig.TrashLifetime.Set("1ns")
906
907         err = v.Trash(TestHash)
908         if err != nil {
909                 t.Fatal(err)
910         }
911         err = checkGet()
912         if err == nil || !os.IsNotExist(err) {
913                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
914         }
915
916         // Even though 1ns has passed, we can untrash because we
917         // haven't called EmptyTrash yet.
918         err = v.Untrash(TestHash)
919         if err != nil {
920                 t.Fatal(err)
921         }
922         err = checkGet()
923         if err != nil {
924                 t.Fatal(err)
925         }
926
927         // Trash it again, and this time call EmptyTrash so it really
928         // goes away.
929         // (In Azure volumes, un/trash changes Mtime, so first backdate again)
930         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
931         err = v.Trash(TestHash)
932         err = checkGet()
933         if err == nil || !os.IsNotExist(err) {
934                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
935         }
936         v.EmptyTrash()
937
938         // Untrash won't find it
939         err = v.Untrash(TestHash)
940         if err == nil || !os.IsNotExist(err) {
941                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
942         }
943
944         // Get block won't find it
945         err = checkGet()
946         if err == nil || !os.IsNotExist(err) {
947                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
948         }
949
950         // Third set: If the same data block gets written again after
951         // being trashed, and then the trash gets emptied, the newer
952         // un-trashed copy doesn't get deleted along with it.
953
954         v.PutRaw(TestHash, TestBlock)
955         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
956
957         theConfig.TrashLifetime.Set("1ns")
958         err = v.Trash(TestHash)
959         if err != nil {
960                 t.Fatal(err)
961         }
962         err = checkGet()
963         if err == nil || !os.IsNotExist(err) {
964                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
965         }
966
967         v.PutRaw(TestHash, TestBlock)
968         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
969
970         // EmptyTrash should not delete the untrashed copy.
971         v.EmptyTrash()
972         err = checkGet()
973         if err != nil {
974                 t.Fatal(err)
975         }
976
977         // Fourth set: If the same data block gets trashed twice with
978         // different deadlines A and C, and then the trash is emptied
979         // at intermediate time B (A < B < C), it is still possible to
980         // untrash the block whose deadline is "C".
981
982         v.PutRaw(TestHash, TestBlock)
983         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
984
985         theConfig.TrashLifetime.Set("1ns")
986         err = v.Trash(TestHash)
987         if err != nil {
988                 t.Fatal(err)
989         }
990
991         v.PutRaw(TestHash, TestBlock)
992         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
993
994         theConfig.TrashLifetime.Set("1h")
995         err = v.Trash(TestHash)
996         if err != nil {
997                 t.Fatal(err)
998         }
999
1000         // EmptyTrash should not prevent us from recovering the
1001         // time.Hour ("C") trash
1002         v.EmptyTrash()
1003         err = v.Untrash(TestHash)
1004         if err != nil {
1005                 t.Fatal(err)
1006         }
1007         err = checkGet()
1008         if err != nil {
1009                 t.Fatal(err)
1010         }
1011 }