2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / volume_generic_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13         "os"
14         "regexp"
15         "sort"
16         "strconv"
17         "strings"
18         "sync"
19         "time"
20
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/arvadostest"
23         "git.arvados.org/arvados.git/sdk/go/ctxlog"
24         "github.com/prometheus/client_golang/prometheus"
25         dto "github.com/prometheus/client_model/go"
26         "github.com/sirupsen/logrus"
27 )
28
29 type TB interface {
30         Error(args ...interface{})
31         Errorf(format string, args ...interface{})
32         Fail()
33         FailNow()
34         Failed() bool
35         Fatal(args ...interface{})
36         Fatalf(format string, args ...interface{})
37         Log(args ...interface{})
38         Logf(format string, args ...interface{})
39 }
40
41 // A TestableVolumeFactory returns a new TestableVolume. The factory
42 // function, and the TestableVolume it returns, can use "t" to write
43 // logs, fail the current test, etc.
44 type TestableVolumeFactory func(t TB, params newVolumeParams) TestableVolume
45
46 // DoGenericVolumeTests runs a set of tests that every TestableVolume
47 // is expected to pass. It calls factory to create a new TestableVolume
48 // for each test case, to avoid leaking state between tests.
49 func DoGenericVolumeTests(t TB, readonly bool, factory TestableVolumeFactory) {
50         var s genericVolumeSuite
51         s.volume.ReadOnly = readonly
52
53         s.testGet(t, factory)
54         s.testGetNoSuchBlock(t, factory)
55
56         if !readonly {
57                 s.testPutBlockWithSameContent(t, factory, TestHash, TestBlock)
58                 s.testPutBlockWithSameContent(t, factory, EmptyHash, EmptyBlock)
59                 s.testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], arvadostest.MD5CollisionData[1])
60                 s.testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, EmptyBlock, arvadostest.MD5CollisionData[0])
61                 s.testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], EmptyBlock)
62                 s.testPutBlockWithDifferentContent(t, factory, EmptyHash, EmptyBlock, arvadostest.MD5CollisionData[0])
63                 s.testPutMultipleBlocks(t, factory)
64
65                 s.testPutAndTouch(t, factory)
66         }
67         s.testTouchNoSuchBlock(t, factory)
68
69         s.testMtimeNoSuchBlock(t, factory)
70
71         s.testIndex(t, factory)
72
73         if !readonly {
74                 s.testDeleteNewBlock(t, factory)
75                 s.testDeleteOldBlock(t, factory)
76         }
77         s.testDeleteNoSuchBlock(t, factory)
78
79         s.testMetrics(t, readonly, factory)
80
81         s.testGetConcurrent(t, factory)
82         if !readonly {
83                 s.testPutConcurrent(t, factory)
84                 s.testPutFullBlock(t, factory)
85                 s.testTrashUntrash(t, readonly, factory)
86                 s.testTrashEmptyTrashUntrash(t, factory)
87         }
88 }
89
90 type genericVolumeSuite struct {
91         cluster    *arvados.Cluster
92         volume     arvados.Volume
93         logger     logrus.FieldLogger
94         metrics    *volumeMetricsVecs
95         registry   *prometheus.Registry
96         bufferPool *bufferPool
97 }
98
99 func (s *genericVolumeSuite) setup(t TB) {
100         s.cluster = testCluster(t)
101         s.logger = ctxlog.TestLogger(t)
102         s.registry = prometheus.NewRegistry()
103         s.metrics = newVolumeMetricsVecs(s.registry)
104         s.bufferPool = newBufferPool(s.logger, 8, s.registry)
105 }
106
107 func (s *genericVolumeSuite) newVolume(t TB, factory TestableVolumeFactory) TestableVolume {
108         return factory(t, newVolumeParams{
109                 UUID:         "zzzzz-nyw5e-999999999999999",
110                 Cluster:      s.cluster,
111                 ConfigVolume: s.volume,
112                 Logger:       s.logger,
113                 MetricsVecs:  s.metrics,
114                 BufferPool:   s.bufferPool,
115         })
116 }
117
118 // Put a test block, get it and verify content
119 // Test should pass for both writable and read-only volumes
120 func (s *genericVolumeSuite) testGet(t TB, factory TestableVolumeFactory) {
121         s.setup(t)
122         v := s.newVolume(t, factory)
123         defer v.Teardown()
124
125         err := v.BlockWrite(context.Background(), TestHash, TestBlock)
126         if err != nil {
127                 t.Error(err)
128         }
129
130         buf := bytes.NewBuffer(nil)
131         _, err = v.BlockRead(context.Background(), TestHash, buf)
132         if err != nil {
133                 t.Error(err)
134         }
135         if bytes.Compare(buf.Bytes(), TestBlock) != 0 {
136                 t.Errorf("expected %s, got %s", "foo", buf.String())
137         }
138 }
139
140 // Invoke get on a block that does not exist in volume; should result in error
141 // Test should pass for both writable and read-only volumes
142 func (s *genericVolumeSuite) testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
143         s.setup(t)
144         v := s.newVolume(t, factory)
145         defer v.Teardown()
146
147         if _, err := v.BlockRead(context.Background(), barHash, io.Discard); err == nil {
148                 t.Errorf("Expected error while getting non-existing block %v", barHash)
149         }
150 }
151
152 // Put a block and put again with same content
153 // Test is intended for only writable volumes
154 func (s *genericVolumeSuite) testPutBlockWithSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
155         s.setup(t)
156         v := s.newVolume(t, factory)
157         defer v.Teardown()
158
159         err := v.BlockWrite(context.Background(), testHash, testData)
160         if err != nil {
161                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
162         }
163
164         err = v.BlockWrite(context.Background(), testHash, testData)
165         if err != nil {
166                 t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
167         }
168 }
169
170 // Put a block and put again with different content
171 // Test is intended for only writable volumes
172 func (s *genericVolumeSuite) testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
173         s.setup(t)
174         v := s.newVolume(t, factory)
175         defer v.Teardown()
176
177         v.BlockWrite(context.Background(), testHash, testDataA)
178
179         putErr := v.BlockWrite(context.Background(), testHash, testDataB)
180         buf := bytes.NewBuffer(nil)
181         _, getErr := v.BlockRead(context.Background(), testHash, buf)
182         if putErr == nil {
183                 // Put must not return a nil error unless it has
184                 // overwritten the existing data.
185                 if buf.String() != string(testDataB) {
186                         t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, testDataB)
187                 }
188         } else {
189                 // It is permissible for Put to fail, but it must
190                 // leave us with either the original data, the new
191                 // data, or nothing at all.
192                 if getErr == nil && buf.String() != string(testDataA) && buf.String() != string(testDataB) {
193                         t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, testDataA, testDataB)
194                 }
195         }
196 }
197
198 // Put and get multiple blocks
199 // Test is intended for only writable volumes
200 func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
201         s.setup(t)
202         v := s.newVolume(t, factory)
203         defer v.Teardown()
204
205         err := v.BlockWrite(context.Background(), TestHash, TestBlock)
206         if err != nil {
207                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
208         }
209
210         err = v.BlockWrite(context.Background(), TestHash2, TestBlock2)
211         if err != nil {
212                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
213         }
214
215         err = v.BlockWrite(context.Background(), TestHash3, TestBlock3)
216         if err != nil {
217                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
218         }
219
220         buf := bytes.NewBuffer(nil)
221         _, err = v.BlockRead(context.Background(), TestHash, buf)
222         if err != nil {
223                 t.Error(err)
224         } else {
225                 if bytes.Compare(buf.Bytes(), TestBlock) != 0 {
226                         t.Errorf("Block present, but got %+q, expected %+q", buf, TestBlock)
227                 }
228         }
229
230         buf.Reset()
231         _, err = v.BlockRead(context.Background(), TestHash2, buf)
232         if err != nil {
233                 t.Error(err)
234         } else {
235                 if bytes.Compare(buf.Bytes(), TestBlock2) != 0 {
236                         t.Errorf("Block present, but got %+q, expected %+q", buf, TestBlock2)
237                 }
238         }
239
240         buf.Reset()
241         _, err = v.BlockRead(context.Background(), TestHash3, buf)
242         if err != nil {
243                 t.Error(err)
244         } else {
245                 if bytes.Compare(buf.Bytes(), TestBlock3) != 0 {
246                         t.Errorf("Block present, but to %+q, expected %+q", buf, TestBlock3)
247                 }
248         }
249 }
250
251 // testPutAndTouch checks that when applying PUT to a block that
252 // already exists, the block's modification time is updated.  Intended
253 // for only writable volumes.
254 func (s *genericVolumeSuite) testPutAndTouch(t TB, factory TestableVolumeFactory) {
255         s.setup(t)
256         v := s.newVolume(t, factory)
257         defer v.Teardown()
258
259         if err := v.BlockWrite(context.Background(), TestHash, TestBlock); err != nil {
260                 t.Error(err)
261         }
262
263         // We'll verify { t0 < threshold < t1 }, where t0 is the
264         // existing block's timestamp on disk before BlockWrite() and t1 is
265         // its timestamp after BlockWrite().
266         threshold := time.Now().Add(-time.Second)
267
268         // Set the stored block's mtime far enough in the past that we
269         // can see the difference between "timestamp didn't change"
270         // and "timestamp granularity is too low".
271         v.TouchWithDate(TestHash, time.Now().Add(-20*time.Second))
272
273         // Make sure v.Mtime() agrees the above Utime really worked.
274         if t0, err := v.Mtime(TestHash); err != nil || t0.IsZero() || !t0.Before(threshold) {
275                 t.Errorf("Setting mtime failed: %v, %v", t0, err)
276         }
277
278         // Write the same block again.
279         if err := v.BlockWrite(context.Background(), TestHash, TestBlock); err != nil {
280                 t.Error(err)
281         }
282
283         // Verify threshold < t1
284         if t1, err := v.Mtime(TestHash); err != nil {
285                 t.Error(err)
286         } else if t1.Before(threshold) {
287                 t.Errorf("t1 %v should be >= threshold %v after v.Put ", t1, threshold)
288         }
289 }
290
291 // Touching a non-existing block should result in error.
292 // Test should pass for both writable and read-only volumes
293 func (s *genericVolumeSuite) testTouchNoSuchBlock(t TB, factory TestableVolumeFactory) {
294         s.setup(t)
295         v := s.newVolume(t, factory)
296         defer v.Teardown()
297
298         if err := v.BlockTouch(TestHash); err == nil {
299                 t.Error("Expected error when attempted to touch a non-existing block")
300         }
301 }
302
303 // Invoking Mtime on a non-existing block should result in error.
304 // Test should pass for both writable and read-only volumes
305 func (s *genericVolumeSuite) testMtimeNoSuchBlock(t TB, factory TestableVolumeFactory) {
306         s.setup(t)
307         v := s.newVolume(t, factory)
308         defer v.Teardown()
309
310         if _, err := v.Mtime("12345678901234567890123456789012"); err == nil {
311                 t.Error("Expected error when updating Mtime on a non-existing block")
312         }
313 }
314
315 // Put a few blocks and invoke Index with:
316 // * no prefix
317 // * with a prefix
318 // * with no such prefix
319 // Test should pass for both writable and read-only volumes
320 func (s *genericVolumeSuite) testIndex(t TB, factory TestableVolumeFactory) {
321         s.setup(t)
322         v := s.newVolume(t, factory)
323         defer v.Teardown()
324
325         // minMtime and maxMtime are the minimum and maximum
326         // acceptable values the index can report for our test
327         // blocks. 1-second precision is acceptable.
328         minMtime := time.Now().UTC().UnixNano()
329         minMtime -= minMtime % 1e9
330
331         v.BlockWrite(context.Background(), TestHash, TestBlock)
332         v.BlockWrite(context.Background(), TestHash2, TestBlock2)
333         v.BlockWrite(context.Background(), TestHash3, TestBlock3)
334
335         maxMtime := time.Now().UTC().UnixNano()
336         if maxMtime%1e9 > 0 {
337                 maxMtime -= maxMtime % 1e9
338                 maxMtime += 1e9
339         }
340
341         // Blocks whose names aren't Keep hashes should be omitted from
342         // index
343         v.BlockWrite(context.Background(), "fffffffffnotreallyahashfffffffff", nil)
344         v.BlockWrite(context.Background(), "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", nil)
345         v.BlockWrite(context.Background(), "f0000000000000000000000000000000f", nil)
346         v.BlockWrite(context.Background(), "f00", nil)
347
348         buf := new(bytes.Buffer)
349         v.Index(context.Background(), "", buf)
350         indexRows := strings.Split(string(buf.Bytes()), "\n")
351         sort.Strings(indexRows)
352         sortedIndex := strings.Join(indexRows, "\n")
353         m := regexp.MustCompile(
354                 `^\n` + TestHash + `\+\d+ (\d+)\n` +
355                         TestHash3 + `\+\d+ \d+\n` +
356                         TestHash2 + `\+\d+ \d+$`,
357         ).FindStringSubmatch(sortedIndex)
358         if m == nil {
359                 t.Errorf("Got index %q for empty prefix", sortedIndex)
360         } else {
361                 mtime, err := strconv.ParseInt(m[1], 10, 64)
362                 if err != nil {
363                         t.Error(err)
364                 } else if mtime < minMtime || mtime > maxMtime {
365                         t.Errorf("got %d for TestHash timestamp, expected %d <= t <= %d",
366                                 mtime, minMtime, maxMtime)
367                 }
368         }
369
370         for _, prefix := range []string{"f", "f15", "f15ac"} {
371                 buf = new(bytes.Buffer)
372                 v.Index(context.Background(), prefix, buf)
373
374                 m, err := regexp.MatchString(`^`+TestHash2+`\+\d+ \d+\n$`, string(buf.Bytes()))
375                 if err != nil {
376                         t.Error(err)
377                 } else if !m {
378                         t.Errorf("Got index %q for prefix %s", string(buf.Bytes()), prefix)
379                 }
380         }
381
382         for _, prefix := range []string{"zero", "zip", "zilch"} {
383                 buf = new(bytes.Buffer)
384                 err := v.Index(context.Background(), prefix, buf)
385                 if err != nil {
386                         t.Errorf("Got error on Index with no such prefix %v", err.Error())
387                 } else if buf.Len() != 0 {
388                         t.Errorf("Expected empty list for Index with no such prefix %s", prefix)
389                 }
390         }
391 }
392
393 // Calling Delete() for a block immediately after writing it (not old enough)
394 // should neither delete the data nor return an error.
395 // Test is intended for only writable volumes
396 func (s *genericVolumeSuite) testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
397         s.setup(t)
398         s.cluster.Collections.BlobSigningTTL.Set("5m")
399         v := s.newVolume(t, factory)
400         defer v.Teardown()
401
402         v.BlockWrite(context.Background(), TestHash, TestBlock)
403
404         if err := v.BlockTrash(TestHash); err != nil {
405                 t.Error(err)
406         }
407         buf := bytes.NewBuffer(nil)
408         _, err := v.BlockRead(context.Background(), TestHash, buf)
409         if err != nil {
410                 t.Error(err)
411         } else if buf.String() != string(TestBlock) {
412                 t.Errorf("Got data %+q, expected %+q", buf.String(), TestBlock)
413         }
414 }
415
416 // Calling Delete() for a block with a timestamp older than
417 // BlobSigningTTL seconds in the past should delete the data.  Test is
418 // intended for only writable volumes
419 func (s *genericVolumeSuite) testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
420         s.setup(t)
421         s.cluster.Collections.BlobSigningTTL.Set("5m")
422         v := s.newVolume(t, factory)
423         defer v.Teardown()
424
425         v.BlockWrite(context.Background(), TestHash, TestBlock)
426         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
427
428         if err := v.BlockTrash(TestHash); err != nil {
429                 t.Error(err)
430         }
431         if _, err := v.BlockRead(context.Background(), TestHash, io.Discard); err == nil || !os.IsNotExist(err) {
432                 t.Errorf("os.IsNotExist(%v) should have been true", err)
433         }
434
435         _, err := v.Mtime(TestHash)
436         if err == nil || !os.IsNotExist(err) {
437                 t.Errorf("os.IsNotExist(%v) should have been true", err)
438         }
439
440         indexBuf := new(bytes.Buffer)
441         v.Index(context.Background(), "", indexBuf)
442         if strings.Contains(string(indexBuf.Bytes()), TestHash) {
443                 t.Errorf("Found trashed block in Index")
444         }
445
446         err = v.BlockTouch(TestHash)
447         if err == nil || !os.IsNotExist(err) {
448                 t.Errorf("os.IsNotExist(%v) should have been true", err)
449         }
450 }
451
452 // Calling Delete() for a block that does not exist should result in error.
453 // Test should pass for both writable and read-only volumes
454 func (s *genericVolumeSuite) testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
455         s.setup(t)
456         v := s.newVolume(t, factory)
457         defer v.Teardown()
458
459         if err := v.BlockTrash(TestHash2); err == nil {
460                 t.Errorf("Expected error when attempting to delete a non-existing block")
461         }
462 }
463
464 func getValueFrom(cv *prometheus.CounterVec, lbls prometheus.Labels) float64 {
465         c, _ := cv.GetMetricWith(lbls)
466         pb := &dto.Metric{}
467         c.Write(pb)
468         return pb.GetCounter().GetValue()
469 }
470
471 func (s *genericVolumeSuite) testMetrics(t TB, readonly bool, factory TestableVolumeFactory) {
472         var err error
473
474         s.setup(t)
475         v := s.newVolume(t, factory)
476         defer v.Teardown()
477
478         opsC, _, ioC := s.metrics.getCounterVecsFor(prometheus.Labels{"device_id": v.DeviceID()})
479
480         if ioC == nil {
481                 t.Error("ioBytes CounterVec is nil")
482                 return
483         }
484
485         if getValueFrom(ioC, prometheus.Labels{"direction": "out"})+
486                 getValueFrom(ioC, prometheus.Labels{"direction": "in"}) > 0 {
487                 t.Error("ioBytes counter should be zero")
488         }
489
490         if opsC == nil {
491                 t.Error("opsCounter CounterVec is nil")
492                 return
493         }
494
495         var c, writeOpCounter, readOpCounter float64
496
497         readOpType, writeOpType := v.ReadWriteOperationLabelValues()
498         writeOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
499         readOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
500
501         // Test Put if volume is writable
502         if !readonly {
503                 err = v.BlockWrite(context.Background(), TestHash, TestBlock)
504                 if err != nil {
505                         t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
506                 }
507                 // Check that the write operations counter increased
508                 c = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
509                 if c <= writeOpCounter {
510                         t.Error("Operation(s) not counted on Put")
511                 }
512                 // Check that bytes counter is > 0
513                 if getValueFrom(ioC, prometheus.Labels{"direction": "out"}) == 0 {
514                         t.Error("ioBytes{direction=out} counter shouldn't be zero")
515                 }
516         } else {
517                 v.BlockWrite(context.Background(), TestHash, TestBlock)
518         }
519
520         _, err = v.BlockRead(context.Background(), TestHash, io.Discard)
521         if err != nil {
522                 t.Error(err)
523         }
524
525         // Check that the operations counter increased
526         c = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
527         if c <= readOpCounter {
528                 t.Error("Operation(s) not counted on Get")
529         }
530         // Check that the bytes "in" counter is > 0
531         if getValueFrom(ioC, prometheus.Labels{"direction": "in"}) == 0 {
532                 t.Error("ioBytes{direction=in} counter shouldn't be zero")
533         }
534 }
535
536 // Launch concurrent Gets
537 // Test should pass for both writable and read-only volumes
538 func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFactory) {
539         s.setup(t)
540         v := s.newVolume(t, factory)
541         defer v.Teardown()
542
543         v.BlockWrite(context.Background(), TestHash, TestBlock)
544         v.BlockWrite(context.Background(), TestHash2, TestBlock2)
545         v.BlockWrite(context.Background(), TestHash3, TestBlock3)
546
547         sem := make(chan int)
548         go func() {
549                 buf := bytes.NewBuffer(nil)
550                 _, err := v.BlockRead(context.Background(), TestHash, buf)
551                 if err != nil {
552                         t.Errorf("err1: %v", err)
553                 }
554                 if buf.String() != string(TestBlock) {
555                         t.Errorf("buf should be %s, is %s", TestBlock, buf)
556                 }
557                 sem <- 1
558         }()
559
560         go func() {
561                 buf := bytes.NewBuffer(nil)
562                 _, err := v.BlockRead(context.Background(), TestHash2, buf)
563                 if err != nil {
564                         t.Errorf("err2: %v", err)
565                 }
566                 if buf.String() != string(TestBlock2) {
567                         t.Errorf("buf should be %s, is %s", TestBlock2, buf)
568                 }
569                 sem <- 1
570         }()
571
572         go func() {
573                 buf := bytes.NewBuffer(nil)
574                 _, err := v.BlockRead(context.Background(), TestHash3, buf)
575                 if err != nil {
576                         t.Errorf("err3: %v", err)
577                 }
578                 if buf.String() != string(TestBlock3) {
579                         t.Errorf("buf should be %s, is %s", TestBlock3, buf)
580                 }
581                 sem <- 1
582         }()
583
584         // Wait for all goroutines to finish
585         for done := 0; done < 3; done++ {
586                 <-sem
587         }
588 }
589
590 // Launch concurrent Puts
591 // Test is intended for only writable volumes
592 func (s *genericVolumeSuite) testPutConcurrent(t TB, factory TestableVolumeFactory) {
593         s.setup(t)
594         v := s.newVolume(t, factory)
595         defer v.Teardown()
596
597         blks := []struct {
598                 hash string
599                 data []byte
600         }{
601                 {hash: TestHash, data: TestBlock},
602                 {hash: TestHash2, data: TestBlock2},
603                 {hash: TestHash3, data: TestBlock3},
604         }
605
606         var wg sync.WaitGroup
607         for _, blk := range blks {
608                 blk := blk
609                 wg.Add(1)
610                 go func() {
611                         defer wg.Done()
612                         err := v.BlockWrite(context.Background(), blk.hash, blk.data)
613                         if err != nil {
614                                 t.Errorf("%s: %v", blk.hash, err)
615                         }
616                 }()
617         }
618         wg.Wait()
619
620         // Check that we actually wrote the blocks.
621         for _, blk := range blks {
622                 buf := bytes.NewBuffer(nil)
623                 _, err := v.BlockRead(context.Background(), blk.hash, buf)
624                 if err != nil {
625                         t.Errorf("get %s: %v", blk.hash, err)
626                 } else if buf.String() != string(blk.data) {
627                         t.Errorf("get %s: expected %s, got %s", blk.hash, blk.data, buf)
628                 }
629         }
630 }
631
632 // Write and read back a full size block
633 func (s *genericVolumeSuite) testPutFullBlock(t TB, factory TestableVolumeFactory) {
634         s.setup(t)
635         v := s.newVolume(t, factory)
636         defer v.Teardown()
637
638         wdata := make([]byte, BlockSize)
639         wdata[0] = 'a'
640         wdata[BlockSize-1] = 'z'
641         hash := fmt.Sprintf("%x", md5.Sum(wdata))
642         err := v.BlockWrite(context.Background(), hash, wdata)
643         if err != nil {
644                 t.Error(err)
645         }
646
647         buf := bytes.NewBuffer(nil)
648         _, err = v.BlockRead(context.Background(), hash, buf)
649         if err != nil {
650                 t.Error(err)
651         }
652         if buf.String() != string(wdata) {
653                 t.Error("buf %+q != wdata %+q", buf, wdata)
654         }
655 }
656
657 // With BlobTrashLifetime != 0, perform:
658 // Trash an old block - which either raises ErrNotImplemented or succeeds
659 // Untrash -  which either raises ErrNotImplemented or succeeds
660 // Get - which must succeed
661 func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory TestableVolumeFactory) {
662         s.setup(t)
663         s.cluster.Collections.BlobTrashLifetime.Set("1h")
664         v := s.newVolume(t, factory)
665         defer v.Teardown()
666
667         // put block and backdate it
668         v.BlockWrite(context.Background(), TestHash, TestBlock)
669         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
670
671         buf := bytes.NewBuffer(nil)
672         _, err := v.BlockRead(context.Background(), TestHash, buf)
673         if err != nil {
674                 t.Error(err)
675         }
676         if buf.String() != string(TestBlock) {
677                 t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
678         }
679
680         // Trash
681         err = v.BlockTrash(TestHash)
682         if err != nil {
683                 t.Error(err)
684                 return
685         }
686         buf.Reset()
687         _, err = v.BlockRead(context.Background(), TestHash, buf)
688         if err == nil || !os.IsNotExist(err) {
689                 t.Errorf("os.IsNotExist(%v) should have been true", err)
690         }
691
692         // Untrash
693         err = v.BlockUntrash(TestHash)
694         if err != nil {
695                 t.Error(err)
696         }
697
698         // Get the block - after trash and untrash sequence
699         buf.Reset()
700         _, err = v.BlockRead(context.Background(), TestHash, buf)
701         if err != nil {
702                 t.Error(err)
703         }
704         if buf.String() != string(TestBlock) {
705                 t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
706         }
707 }
708
709 func (s *genericVolumeSuite) testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
710         s.setup(t)
711         v := s.newVolume(t, factory)
712         defer v.Teardown()
713
714         checkGet := func() error {
715                 buf := bytes.NewBuffer(nil)
716                 _, err := v.BlockRead(context.Background(), TestHash, buf)
717                 if err != nil {
718                         return err
719                 }
720                 if buf.String() != string(TestBlock) {
721                         t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
722                 }
723
724                 _, err = v.Mtime(TestHash)
725                 if err != nil {
726                         return err
727                 }
728
729                 indexBuf := new(bytes.Buffer)
730                 v.Index(context.Background(), "", indexBuf)
731                 if !strings.Contains(string(indexBuf.Bytes()), TestHash) {
732                         return os.ErrNotExist
733                 }
734
735                 return nil
736         }
737
738         // First set: EmptyTrash before reaching the trash deadline.
739
740         s.cluster.Collections.BlobTrashLifetime.Set("1h")
741
742         v.BlockWrite(context.Background(), TestHash, TestBlock)
743         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
744
745         err := checkGet()
746         if err != nil {
747                 t.Error(err)
748         }
749
750         // Trash the block
751         err = v.BlockTrash(TestHash)
752         if err != nil {
753                 t.Error(err)
754         }
755
756         err = checkGet()
757         if err == nil || !os.IsNotExist(err) {
758                 t.Errorf("os.IsNotExist(%v) should have been true", err)
759         }
760
761         err = v.BlockTouch(TestHash)
762         if err == nil || !os.IsNotExist(err) {
763                 t.Errorf("os.IsNotExist(%v) should have been true", err)
764         }
765
766         v.EmptyTrash()
767
768         // Even after emptying the trash, we can untrash our block
769         // because the deadline hasn't been reached.
770         err = v.BlockUntrash(TestHash)
771         if err != nil {
772                 t.Error(err)
773         }
774
775         err = checkGet()
776         if err != nil {
777                 t.Error(err)
778         }
779
780         err = v.BlockTouch(TestHash)
781         if err != nil {
782                 t.Error(err)
783         }
784
785         // Because we Touch'ed, need to backdate again for next set of tests
786         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
787
788         // If the only block in the trash has already been untrashed,
789         // most volumes will fail a subsequent Untrash with a 404, but
790         // it's also acceptable for Untrash to succeed.
791         err = v.BlockUntrash(TestHash)
792         if err != nil && !os.IsNotExist(err) {
793                 t.Errorf("Expected success or os.IsNotExist(), but got: %v", err)
794         }
795
796         // The additional Untrash should not interfere with our
797         // already-untrashed copy.
798         err = checkGet()
799         if err != nil {
800                 t.Error(err)
801         }
802
803         // Untrash might have updated the timestamp, so backdate again
804         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
805
806         // Second set: EmptyTrash after the trash deadline has passed.
807
808         s.cluster.Collections.BlobTrashLifetime.Set("1ns")
809
810         err = v.BlockTrash(TestHash)
811         if err != nil {
812                 t.Error(err)
813         }
814         err = checkGet()
815         if err == nil || !os.IsNotExist(err) {
816                 t.Errorf("os.IsNotExist(%v) should have been true", err)
817         }
818
819         // Even though 1ns has passed, we can untrash because we
820         // haven't called EmptyTrash yet.
821         err = v.BlockUntrash(TestHash)
822         if err != nil {
823                 t.Error(err)
824         }
825         err = checkGet()
826         if err != nil {
827                 t.Error(err)
828         }
829
830         // Trash it again, and this time call EmptyTrash so it really
831         // goes away.
832         // (In Azure volumes, un/trash changes Mtime, so first backdate again)
833         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
834         _ = v.BlockTrash(TestHash)
835         err = checkGet()
836         if err == nil || !os.IsNotExist(err) {
837                 t.Errorf("os.IsNotExist(%v) should have been true", err)
838         }
839         v.EmptyTrash()
840
841         // Untrash won't find it
842         err = v.BlockUntrash(TestHash)
843         if err == nil || !os.IsNotExist(err) {
844                 t.Errorf("os.IsNotExist(%v) should have been true", err)
845         }
846
847         // Get block won't find it
848         err = checkGet()
849         if err == nil || !os.IsNotExist(err) {
850                 t.Errorf("os.IsNotExist(%v) should have been true", err)
851         }
852
853         // Third set: If the same data block gets written again after
854         // being trashed, and then the trash gets emptied, the newer
855         // un-trashed copy doesn't get deleted along with it.
856
857         v.BlockWrite(context.Background(), TestHash, TestBlock)
858         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
859
860         s.cluster.Collections.BlobTrashLifetime.Set("1ns")
861         err = v.BlockTrash(TestHash)
862         if err != nil {
863                 t.Error(err)
864         }
865         err = checkGet()
866         if err == nil || !os.IsNotExist(err) {
867                 t.Errorf("os.IsNotExist(%v) should have been true", err)
868         }
869
870         v.BlockWrite(context.Background(), TestHash, TestBlock)
871         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
872
873         // EmptyTrash should not delete the untrashed copy.
874         v.EmptyTrash()
875         err = checkGet()
876         if err != nil {
877                 t.Error(err)
878         }
879
880         // Fourth set: If the same data block gets trashed twice with
881         // different deadlines A and C, and then the trash is emptied
882         // at intermediate time B (A < B < C), it is still possible to
883         // untrash the block whose deadline is "C".
884
885         v.BlockWrite(context.Background(), TestHash, TestBlock)
886         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
887
888         s.cluster.Collections.BlobTrashLifetime.Set("1ns")
889         err = v.BlockTrash(TestHash)
890         if err != nil {
891                 t.Error(err)
892         }
893
894         v.BlockWrite(context.Background(), TestHash, TestBlock)
895         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
896
897         s.cluster.Collections.BlobTrashLifetime.Set("1h")
898         err = v.BlockTrash(TestHash)
899         if err != nil {
900                 t.Error(err)
901         }
902
903         // EmptyTrash should not prevent us from recovering the
904         // time.Hour ("C") trash
905         v.EmptyTrash()
906         err = v.BlockUntrash(TestHash)
907         if err != nil {
908                 t.Error(err)
909         }
910         err = checkGet()
911         if err != nil {
912                 t.Error(err)
913         }
914 }