1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "git.arvados.org/arvados.git/sdk/go/arvadostest"
22 "git.arvados.org/arvados.git/sdk/go/ctxlog"
23 "github.com/prometheus/client_golang/prometheus"
24 dto "github.com/prometheus/client_model/go"
25 "github.com/sirupsen/logrus"
29 Error(args ...interface{})
30 Errorf(format string, args ...interface{})
34 Fatal(args ...interface{})
35 Fatalf(format string, args ...interface{})
36 Log(args ...interface{})
37 Logf(format string, args ...interface{})
40 // A TestableVolumeFactory returns a new TestableVolume. The factory
41 // function, and the TestableVolume it returns, can use "t" to write
42 // logs, fail the current test, etc.
43 type TestableVolumeFactory func(t TB, params newVolumeParams) TestableVolume
45 // DoGenericVolumeTests runs a set of tests that every TestableVolume
46 // is expected to pass. It calls factory to create a new TestableVolume
47 // for each test case, to avoid leaking state between tests.
48 func DoGenericVolumeTests(t TB, readonly bool, factory TestableVolumeFactory) {
49 var s genericVolumeSuite
50 s.volume.ReadOnly = readonly
53 s.testGetNoSuchBlock(t, factory)
56 s.testPutBlockWithSameContent(t, factory, TestHash, TestBlock)
57 s.testPutBlockWithSameContent(t, factory, EmptyHash, EmptyBlock)
58 s.testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], arvadostest.MD5CollisionData[1])
59 s.testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, EmptyBlock, arvadostest.MD5CollisionData[0])
60 s.testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], EmptyBlock)
61 s.testPutBlockWithDifferentContent(t, factory, EmptyHash, EmptyBlock, arvadostest.MD5CollisionData[0])
62 s.testPutMultipleBlocks(t, factory)
64 s.testPutAndTouch(t, factory)
66 s.testTouchNoSuchBlock(t, factory)
68 s.testMtimeNoSuchBlock(t, factory)
70 s.testIndex(t, factory)
73 s.testDeleteNewBlock(t, factory)
74 s.testDeleteOldBlock(t, factory)
76 s.testDeleteNoSuchBlock(t, factory)
78 s.testMetrics(t, readonly, factory)
80 s.testGetConcurrent(t, factory)
82 s.testPutConcurrent(t, factory)
83 s.testPutFullBlock(t, factory)
84 s.testTrashUntrash(t, readonly, factory)
85 s.testTrashEmptyTrashUntrash(t, factory)
89 type genericVolumeSuite struct {
90 cluster *arvados.Cluster
92 logger logrus.FieldLogger
93 metrics *volumeMetricsVecs
94 registry *prometheus.Registry
95 bufferPool *bufferPool
98 func (s *genericVolumeSuite) setup(t TB) {
99 s.cluster = testCluster(t)
100 s.logger = ctxlog.TestLogger(t)
101 s.registry = prometheus.NewRegistry()
102 s.metrics = newVolumeMetricsVecs(s.registry)
103 s.bufferPool = newBufferPool(s.logger, 8, s.registry)
106 func (s *genericVolumeSuite) newVolume(t TB, factory TestableVolumeFactory) TestableVolume {
107 return factory(t, newVolumeParams{
108 UUID: "zzzzz-nyw5e-999999999999999",
110 ConfigVolume: s.volume,
112 MetricsVecs: s.metrics,
113 BufferPool: s.bufferPool,
117 // Put a test block, get it and verify content
118 // Test should pass for both writable and read-only volumes
119 func (s *genericVolumeSuite) testGet(t TB, factory TestableVolumeFactory) {
121 v := s.newVolume(t, factory)
124 err := v.BlockWrite(context.Background(), TestHash, TestBlock)
130 err = v.BlockRead(context.Background(), TestHash, buf)
134 if bytes.Compare(buf.Bytes(), TestBlock) != 0 {
135 t.Errorf("expected %s, got %s", "foo", buf.String())
139 // Invoke get on a block that does not exist in volume; should result in error
140 // Test should pass for both writable and read-only volumes
141 func (s *genericVolumeSuite) testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
143 v := s.newVolume(t, factory)
146 if err := v.BlockRead(context.Background(), barHash, brdiscard); err == nil {
147 t.Errorf("Expected error while getting non-existing block %v", barHash)
151 // Put a block and put again with same content
152 // Test is intended for only writable volumes
153 func (s *genericVolumeSuite) testPutBlockWithSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
155 v := s.newVolume(t, factory)
158 err := v.BlockWrite(context.Background(), testHash, testData)
160 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
163 err = v.BlockWrite(context.Background(), testHash, testData)
165 t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
169 // Put a block and put again with different content
170 // Test is intended for only writable volumes
171 func (s *genericVolumeSuite) testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
173 v := s.newVolume(t, factory)
176 v.BlockWrite(context.Background(), testHash, testDataA)
178 putErr := v.BlockWrite(context.Background(), testHash, testDataB)
180 getErr := v.BlockRead(context.Background(), testHash, buf)
182 // Put must not return a nil error unless it has
183 // overwritten the existing data.
184 if buf.String() != string(testDataB) {
185 t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, testDataB)
188 // It is permissible for Put to fail, but it must
189 // leave us with either the original data, the new
190 // data, or nothing at all.
191 if getErr == nil && buf.String() != string(testDataA) && buf.String() != string(testDataB) {
192 t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, testDataA, testDataB)
197 // Put and get multiple blocks
198 // Test is intended for only writable volumes
199 func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
201 v := s.newVolume(t, factory)
204 err := v.BlockWrite(context.Background(), TestHash, TestBlock)
206 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
209 err = v.BlockWrite(context.Background(), TestHash2, TestBlock2)
211 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
214 err = v.BlockWrite(context.Background(), TestHash3, TestBlock3)
216 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
220 err = v.BlockRead(context.Background(), TestHash, buf)
224 if bytes.Compare(buf.Bytes(), TestBlock) != 0 {
225 t.Errorf("Block present, but got %+q, expected %+q", buf, TestBlock)
230 err = v.BlockRead(context.Background(), TestHash2, buf)
234 if bytes.Compare(buf.Bytes(), TestBlock2) != 0 {
235 t.Errorf("Block present, but got %+q, expected %+q", buf, TestBlock2)
240 err = v.BlockRead(context.Background(), TestHash3, buf)
244 if bytes.Compare(buf.Bytes(), TestBlock3) != 0 {
245 t.Errorf("Block present, but to %+q, expected %+q", buf, TestBlock3)
250 // testPutAndTouch checks that when applying PUT to a block that
251 // already exists, the block's modification time is updated. Intended
252 // for only writable volumes.
253 func (s *genericVolumeSuite) testPutAndTouch(t TB, factory TestableVolumeFactory) {
255 v := s.newVolume(t, factory)
258 if err := v.BlockWrite(context.Background(), TestHash, TestBlock); err != nil {
262 // We'll verify { t0 < threshold < t1 }, where t0 is the
263 // existing block's timestamp on disk before BlockWrite() and t1 is
264 // its timestamp after BlockWrite().
265 threshold := time.Now().Add(-time.Second)
267 // Set the stored block's mtime far enough in the past that we
268 // can see the difference between "timestamp didn't change"
269 // and "timestamp granularity is too low".
270 v.TouchWithDate(TestHash, time.Now().Add(-20*time.Second))
272 // Make sure v.Mtime() agrees the above Utime really worked.
273 if t0, err := v.Mtime(TestHash); err != nil || t0.IsZero() || !t0.Before(threshold) {
274 t.Errorf("Setting mtime failed: threshold %v, t0 %v, err %v", threshold.UTC(), t0.UTC(), err)
277 // Write the same block again.
278 if err := v.BlockWrite(context.Background(), TestHash, TestBlock); err != nil {
282 // Verify threshold < t1
283 if t1, err := v.Mtime(TestHash); err != nil {
285 } else if t1.Before(threshold) {
286 t.Errorf("t1 %v should be >= threshold %v after v.Put ", t1, threshold)
290 // Touching a non-existing block should result in error.
291 // Test should pass for both writable and read-only volumes
292 func (s *genericVolumeSuite) testTouchNoSuchBlock(t TB, factory TestableVolumeFactory) {
294 v := s.newVolume(t, factory)
297 if err := v.BlockTouch(TestHash); err == nil {
298 t.Error("Expected error when attempted to touch a non-existing block")
302 // Invoking Mtime on a non-existing block should result in error.
303 // Test should pass for both writable and read-only volumes
304 func (s *genericVolumeSuite) testMtimeNoSuchBlock(t TB, factory TestableVolumeFactory) {
306 v := s.newVolume(t, factory)
309 if _, err := v.Mtime("12345678901234567890123456789012"); err == nil {
310 t.Error("Expected error when updating Mtime on a non-existing block")
314 // Put a few blocks and invoke Index with:
317 // * with no such prefix
318 // Test should pass for both writable and read-only volumes
319 func (s *genericVolumeSuite) testIndex(t TB, factory TestableVolumeFactory) {
321 v := s.newVolume(t, factory)
324 // minMtime and maxMtime are the minimum and maximum
325 // acceptable values the index can report for our test
326 // blocks. 1-second precision is acceptable.
327 minMtime := time.Now().UTC().UnixNano()
328 minMtime -= minMtime % 1e9
330 v.BlockWrite(context.Background(), TestHash, TestBlock)
331 v.BlockWrite(context.Background(), TestHash2, TestBlock2)
332 v.BlockWrite(context.Background(), TestHash3, TestBlock3)
334 maxMtime := time.Now().UTC().UnixNano()
335 if maxMtime%1e9 > 0 {
336 maxMtime -= maxMtime % 1e9
340 // Blocks whose names aren't Keep hashes should be omitted from
342 v.BlockWrite(context.Background(), "fffffffffnotreallyahashfffffffff", nil)
343 v.BlockWrite(context.Background(), "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", nil)
344 v.BlockWrite(context.Background(), "f0000000000000000000000000000000f", nil)
345 v.BlockWrite(context.Background(), "f00", nil)
347 buf := new(bytes.Buffer)
348 v.Index(context.Background(), "", buf)
349 indexRows := strings.Split(string(buf.Bytes()), "\n")
350 sort.Strings(indexRows)
351 sortedIndex := strings.Join(indexRows, "\n")
352 m := regexp.MustCompile(
353 `^\n` + TestHash + `\+\d+ (\d+)\n` +
354 TestHash3 + `\+\d+ \d+\n` +
355 TestHash2 + `\+\d+ \d+$`,
356 ).FindStringSubmatch(sortedIndex)
358 t.Errorf("Got index %q for empty prefix", sortedIndex)
360 mtime, err := strconv.ParseInt(m[1], 10, 64)
363 } else if mtime < minMtime || mtime > maxMtime {
364 t.Errorf("got %d for TestHash timestamp, expected %d <= t <= %d",
365 mtime, minMtime, maxMtime)
369 for _, prefix := range []string{"f", "f15", "f15ac"} {
370 buf = new(bytes.Buffer)
371 v.Index(context.Background(), prefix, buf)
373 m, err := regexp.MatchString(`^`+TestHash2+`\+\d+ \d+\n$`, string(buf.Bytes()))
377 t.Errorf("Got index %q for prefix %s", string(buf.Bytes()), prefix)
381 for _, prefix := range []string{"zero", "zip", "zilch"} {
382 buf = new(bytes.Buffer)
383 err := v.Index(context.Background(), prefix, buf)
385 t.Errorf("Got error on Index with no such prefix %v", err.Error())
386 } else if buf.Len() != 0 {
387 t.Errorf("Expected empty list for Index with no such prefix %s", prefix)
392 // Calling Delete() for a block immediately after writing it (not old enough)
393 // should neither delete the data nor return an error.
394 // Test is intended for only writable volumes
395 func (s *genericVolumeSuite) testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
397 s.cluster.Collections.BlobSigningTTL.Set("5m")
398 v := s.newVolume(t, factory)
401 v.BlockWrite(context.Background(), TestHash, TestBlock)
403 if err := v.BlockTrash(TestHash); err != nil {
407 err := v.BlockRead(context.Background(), TestHash, buf)
410 } else if buf.String() != string(TestBlock) {
411 t.Errorf("Got data %+q, expected %+q", buf.String(), TestBlock)
415 // Calling Delete() for a block with a timestamp older than
416 // BlobSigningTTL seconds in the past should delete the data. Test is
417 // intended for only writable volumes
418 func (s *genericVolumeSuite) testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
420 s.cluster.Collections.BlobSigningTTL.Set("5m")
421 v := s.newVolume(t, factory)
424 v.BlockWrite(context.Background(), TestHash, TestBlock)
425 v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
427 if err := v.BlockTrash(TestHash); err != nil {
430 if err := v.BlockRead(context.Background(), TestHash, brdiscard); err == nil || !os.IsNotExist(err) {
431 t.Errorf("os.IsNotExist(%v) should have been true", err)
434 _, err := v.Mtime(TestHash)
435 if err == nil || !os.IsNotExist(err) {
436 t.Errorf("os.IsNotExist(%v) should have been true", err)
439 indexBuf := new(bytes.Buffer)
440 v.Index(context.Background(), "", indexBuf)
441 if strings.Contains(string(indexBuf.Bytes()), TestHash) {
442 t.Errorf("Found trashed block in Index")
445 err = v.BlockTouch(TestHash)
446 if err == nil || !os.IsNotExist(err) {
447 t.Errorf("os.IsNotExist(%v) should have been true", err)
451 // Calling Delete() for a block that does not exist should result in error.
452 // Test should pass for both writable and read-only volumes
453 func (s *genericVolumeSuite) testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
455 v := s.newVolume(t, factory)
458 if err := v.BlockTrash(TestHash2); err == nil {
459 t.Errorf("Expected error when attempting to delete a non-existing block")
463 func getValueFrom(cv *prometheus.CounterVec, lbls prometheus.Labels) float64 {
464 c, _ := cv.GetMetricWith(lbls)
467 return pb.GetCounter().GetValue()
470 func (s *genericVolumeSuite) testMetrics(t TB, readonly bool, factory TestableVolumeFactory) {
474 v := s.newVolume(t, factory)
477 opsC, _, ioC := s.metrics.getCounterVecsFor(prometheus.Labels{"device_id": v.DeviceID()})
480 t.Error("ioBytes CounterVec is nil")
484 if getValueFrom(ioC, prometheus.Labels{"direction": "out"})+
485 getValueFrom(ioC, prometheus.Labels{"direction": "in"}) > 0 {
486 t.Error("ioBytes counter should be zero")
490 t.Error("opsCounter CounterVec is nil")
494 var c, writeOpCounter, readOpCounter float64
496 readOpType, writeOpType := v.ReadWriteOperationLabelValues()
497 writeOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
498 readOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
500 // Test Put if volume is writable
502 err = v.BlockWrite(context.Background(), TestHash, TestBlock)
504 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
506 // Check that the write operations counter increased
507 c = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
508 if c <= writeOpCounter {
509 t.Error("Operation(s) not counted on Put")
511 // Check that bytes counter is > 0
512 if getValueFrom(ioC, prometheus.Labels{"direction": "out"}) == 0 {
513 t.Error("ioBytes{direction=out} counter shouldn't be zero")
516 v.BlockWrite(context.Background(), TestHash, TestBlock)
519 err = v.BlockRead(context.Background(), TestHash, brdiscard)
524 // Check that the operations counter increased
525 c = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
526 if c <= readOpCounter {
527 t.Error("Operation(s) not counted on Get")
529 // Check that the bytes "in" counter is > 0
530 if getValueFrom(ioC, prometheus.Labels{"direction": "in"}) == 0 {
531 t.Error("ioBytes{direction=in} counter shouldn't be zero")
535 // Launch concurrent Gets
536 // Test should pass for both writable and read-only volumes
537 func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFactory) {
539 v := s.newVolume(t, factory)
542 v.BlockWrite(context.Background(), TestHash, TestBlock)
543 v.BlockWrite(context.Background(), TestHash2, TestBlock2)
544 v.BlockWrite(context.Background(), TestHash3, TestBlock3)
546 sem := make(chan int)
549 err := v.BlockRead(context.Background(), TestHash, buf)
551 t.Errorf("err1: %v", err)
553 if buf.String() != string(TestBlock) {
554 t.Errorf("buf should be %s, is %s", TestBlock, buf)
561 err := v.BlockRead(context.Background(), TestHash2, buf)
563 t.Errorf("err2: %v", err)
565 if buf.String() != string(TestBlock2) {
566 t.Errorf("buf should be %s, is %s", TestBlock2, buf)
573 err := v.BlockRead(context.Background(), TestHash3, buf)
575 t.Errorf("err3: %v", err)
577 if buf.String() != string(TestBlock3) {
578 t.Errorf("buf should be %s, is %s", TestBlock3, buf)
583 // Wait for all goroutines to finish
584 for done := 0; done < 3; done++ {
589 // Launch concurrent Puts
590 // Test is intended for only writable volumes
591 func (s *genericVolumeSuite) testPutConcurrent(t TB, factory TestableVolumeFactory) {
593 v := s.newVolume(t, factory)
600 {hash: TestHash, data: TestBlock},
601 {hash: TestHash2, data: TestBlock2},
602 {hash: TestHash3, data: TestBlock3},
605 var wg sync.WaitGroup
606 for _, blk := range blks {
611 err := v.BlockWrite(context.Background(), blk.hash, blk.data)
613 t.Errorf("%s: %v", blk.hash, err)
619 // Check that we actually wrote the blocks.
620 for _, blk := range blks {
622 err := v.BlockRead(context.Background(), blk.hash, buf)
624 t.Errorf("get %s: %v", blk.hash, err)
625 } else if buf.String() != string(blk.data) {
626 t.Errorf("get %s: expected %s, got %s", blk.hash, blk.data, buf)
631 // Write and read back a full size block
632 func (s *genericVolumeSuite) testPutFullBlock(t TB, factory TestableVolumeFactory) {
634 v := s.newVolume(t, factory)
637 wdata := make([]byte, BlockSize)
639 wdata[BlockSize-1] = 'z'
640 hash := fmt.Sprintf("%x", md5.Sum(wdata))
641 err := v.BlockWrite(context.Background(), hash, wdata)
647 err = v.BlockRead(context.Background(), hash, buf)
651 if buf.String() != string(wdata) {
652 t.Errorf("buf (len %d) != wdata (len %d)", buf.Len(), len(wdata))
656 // With BlobTrashLifetime != 0, perform:
657 // Trash an old block - which either raises ErrNotImplemented or succeeds
658 // Untrash - which either raises ErrNotImplemented or succeeds
659 // Get - which must succeed
660 func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory TestableVolumeFactory) {
662 s.cluster.Collections.BlobTrashLifetime.Set("1h")
663 v := s.newVolume(t, factory)
666 // put block and backdate it
667 v.BlockWrite(context.Background(), TestHash, TestBlock)
668 v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
671 err := v.BlockRead(context.Background(), TestHash, buf)
675 if buf.String() != string(TestBlock) {
676 t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
680 err = v.BlockTrash(TestHash)
686 err = v.BlockRead(context.Background(), TestHash, buf)
687 if err == nil || !os.IsNotExist(err) {
688 t.Errorf("os.IsNotExist(%v) should have been true", err)
692 err = v.BlockUntrash(TestHash)
697 // Get the block - after trash and untrash sequence
699 err = v.BlockRead(context.Background(), TestHash, buf)
703 if buf.String() != string(TestBlock) {
704 t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
708 func (s *genericVolumeSuite) testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
710 v := s.newVolume(t, factory)
713 checkGet := func() error {
715 err := v.BlockRead(context.Background(), TestHash, buf)
719 if buf.String() != string(TestBlock) {
720 t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
723 _, err = v.Mtime(TestHash)
728 indexBuf := new(bytes.Buffer)
729 v.Index(context.Background(), "", indexBuf)
730 if !strings.Contains(string(indexBuf.Bytes()), TestHash) {
731 return os.ErrNotExist
737 // First set: EmptyTrash before reaching the trash deadline.
739 s.cluster.Collections.BlobTrashLifetime.Set("1h")
741 v.BlockWrite(context.Background(), TestHash, TestBlock)
742 v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
750 err = v.BlockTrash(TestHash)
756 if err == nil || !os.IsNotExist(err) {
757 t.Errorf("os.IsNotExist(%v) should have been true", err)
760 err = v.BlockTouch(TestHash)
761 if err == nil || !os.IsNotExist(err) {
762 t.Errorf("os.IsNotExist(%v) should have been true", err)
767 // Even after emptying the trash, we can untrash our block
768 // because the deadline hasn't been reached.
769 err = v.BlockUntrash(TestHash)
779 err = v.BlockTouch(TestHash)
784 // Because we Touch'ed, need to backdate again for next set of tests
785 v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
787 // If the only block in the trash has already been untrashed,
788 // most volumes will fail a subsequent Untrash with a 404, but
789 // it's also acceptable for Untrash to succeed.
790 err = v.BlockUntrash(TestHash)
791 if err != nil && !os.IsNotExist(err) {
792 t.Errorf("Expected success or os.IsNotExist(), but got: %v", err)
795 // The additional Untrash should not interfere with our
796 // already-untrashed copy.
802 // Untrash might have updated the timestamp, so backdate again
803 v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
805 // Second set: EmptyTrash after the trash deadline has passed.
807 s.cluster.Collections.BlobTrashLifetime.Set("1ns")
809 err = v.BlockTrash(TestHash)
814 if err == nil || !os.IsNotExist(err) {
815 t.Errorf("os.IsNotExist(%v) should have been true", err)
818 // Even though 1ns has passed, we can untrash because we
819 // haven't called EmptyTrash yet.
820 err = v.BlockUntrash(TestHash)
829 // Trash it again, and this time call EmptyTrash so it really
831 // (In Azure volumes, un/trash changes Mtime, so first backdate again)
832 v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
833 _ = v.BlockTrash(TestHash)
835 if err == nil || !os.IsNotExist(err) {
836 t.Errorf("os.IsNotExist(%v) should have been true", err)
840 // Untrash won't find it
841 err = v.BlockUntrash(TestHash)
842 if err == nil || !os.IsNotExist(err) {
843 t.Errorf("os.IsNotExist(%v) should have been true", err)
846 // Get block won't find it
848 if err == nil || !os.IsNotExist(err) {
849 t.Errorf("os.IsNotExist(%v) should have been true", err)
852 // Third set: If the same data block gets written again after
853 // being trashed, and then the trash gets emptied, the newer
854 // un-trashed copy doesn't get deleted along with it.
856 v.BlockWrite(context.Background(), TestHash, TestBlock)
857 v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
859 s.cluster.Collections.BlobTrashLifetime.Set("1ns")
860 err = v.BlockTrash(TestHash)
865 if err == nil || !os.IsNotExist(err) {
866 t.Errorf("os.IsNotExist(%v) should have been true", err)
869 v.BlockWrite(context.Background(), TestHash, TestBlock)
870 v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
872 // EmptyTrash should not delete the untrashed copy.
879 // Fourth set: If the same data block gets trashed twice with
880 // different deadlines A and C, and then the trash is emptied
881 // at intermediate time B (A < B < C), it is still possible to
882 // untrash the block whose deadline is "C".
884 v.BlockWrite(context.Background(), TestHash, TestBlock)
885 v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
887 s.cluster.Collections.BlobTrashLifetime.Set("1ns")
888 err = v.BlockTrash(TestHash)
893 v.BlockWrite(context.Background(), TestHash, TestBlock)
894 v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
896 s.cluster.Collections.BlobTrashLifetime.Set("1h")
897 err = v.BlockTrash(TestHash)
902 // EmptyTrash should not prevent us from recovering the
903 // time.Hour ("C") trash
905 err = v.BlockUntrash(TestHash)