Merge branch '15317-metrics'
[arvados.git] / services / keepstore / volume_generic_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "os"
13         "regexp"
14         "sort"
15         "strconv"
16         "strings"
17         "sync"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/arvadostest"
22         "git.arvados.org/arvados.git/sdk/go/ctxlog"
23         "github.com/prometheus/client_golang/prometheus"
24         dto "github.com/prometheus/client_model/go"
25         "github.com/sirupsen/logrus"
26 )
27
28 type TB interface {
29         Error(args ...interface{})
30         Errorf(format string, args ...interface{})
31         Fail()
32         FailNow()
33         Failed() bool
34         Fatal(args ...interface{})
35         Fatalf(format string, args ...interface{})
36         Log(args ...interface{})
37         Logf(format string, args ...interface{})
38 }
39
40 // A TestableVolumeFactory returns a new TestableVolume. The factory
41 // function, and the TestableVolume it returns, can use "t" to write
42 // logs, fail the current test, etc.
43 type TestableVolumeFactory func(t TB, params newVolumeParams) TestableVolume
44
45 // DoGenericVolumeTests runs a set of tests that every TestableVolume
46 // is expected to pass. It calls factory to create a new TestableVolume
47 // for each test case, to avoid leaking state between tests.
48 func DoGenericVolumeTests(t TB, readonly bool, factory TestableVolumeFactory) {
49         var s genericVolumeSuite
50         s.volume.ReadOnly = readonly
51
52         s.testGet(t, factory)
53         s.testGetNoSuchBlock(t, factory)
54
55         if !readonly {
56                 s.testPutBlockWithSameContent(t, factory, TestHash, TestBlock)
57                 s.testPutBlockWithSameContent(t, factory, EmptyHash, EmptyBlock)
58                 s.testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], arvadostest.MD5CollisionData[1])
59                 s.testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, EmptyBlock, arvadostest.MD5CollisionData[0])
60                 s.testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], EmptyBlock)
61                 s.testPutBlockWithDifferentContent(t, factory, EmptyHash, EmptyBlock, arvadostest.MD5CollisionData[0])
62                 s.testPutMultipleBlocks(t, factory)
63
64                 s.testPutAndTouch(t, factory)
65         }
66         s.testTouchNoSuchBlock(t, factory)
67
68         s.testMtimeNoSuchBlock(t, factory)
69
70         s.testIndex(t, factory)
71
72         if !readonly {
73                 s.testDeleteNewBlock(t, factory)
74                 s.testDeleteOldBlock(t, factory)
75         }
76         s.testDeleteNoSuchBlock(t, factory)
77
78         s.testMetrics(t, readonly, factory)
79
80         s.testGetConcurrent(t, factory)
81         if !readonly {
82                 s.testPutConcurrent(t, factory)
83                 s.testPutFullBlock(t, factory)
84                 s.testTrashUntrash(t, readonly, factory)
85                 s.testTrashEmptyTrashUntrash(t, factory)
86         }
87 }
88
89 type genericVolumeSuite struct {
90         cluster    *arvados.Cluster
91         volume     arvados.Volume
92         logger     logrus.FieldLogger
93         metrics    *volumeMetricsVecs
94         registry   *prometheus.Registry
95         bufferPool *bufferPool
96 }
97
98 func (s *genericVolumeSuite) setup(t TB) {
99         s.cluster = testCluster(t)
100         s.logger = ctxlog.TestLogger(t)
101         s.registry = prometheus.NewRegistry()
102         s.metrics = newVolumeMetricsVecs(s.registry)
103         s.bufferPool = newBufferPool(s.logger, 8, s.registry)
104 }
105
106 func (s *genericVolumeSuite) newVolume(t TB, factory TestableVolumeFactory) TestableVolume {
107         return factory(t, newVolumeParams{
108                 UUID:         "zzzzz-nyw5e-999999999999999",
109                 Cluster:      s.cluster,
110                 ConfigVolume: s.volume,
111                 Logger:       s.logger,
112                 MetricsVecs:  s.metrics,
113                 BufferPool:   s.bufferPool,
114         })
115 }
116
117 // Put a test block, get it and verify content
118 // Test should pass for both writable and read-only volumes
119 func (s *genericVolumeSuite) testGet(t TB, factory TestableVolumeFactory) {
120         s.setup(t)
121         v := s.newVolume(t, factory)
122         defer v.Teardown()
123
124         err := v.BlockWrite(context.Background(), TestHash, TestBlock)
125         if err != nil {
126                 t.Error(err)
127         }
128
129         buf := &brbuffer{}
130         err = v.BlockRead(context.Background(), TestHash, buf)
131         if err != nil {
132                 t.Error(err)
133         }
134         if bytes.Compare(buf.Bytes(), TestBlock) != 0 {
135                 t.Errorf("expected %s, got %s", "foo", buf.String())
136         }
137 }
138
139 // Invoke get on a block that does not exist in volume; should result in error
140 // Test should pass for both writable and read-only volumes
141 func (s *genericVolumeSuite) testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
142         s.setup(t)
143         v := s.newVolume(t, factory)
144         defer v.Teardown()
145
146         if err := v.BlockRead(context.Background(), barHash, brdiscard); err == nil {
147                 t.Errorf("Expected error while getting non-existing block %v", barHash)
148         }
149 }
150
151 // Put a block and put again with same content
152 // Test is intended for only writable volumes
153 func (s *genericVolumeSuite) testPutBlockWithSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
154         s.setup(t)
155         v := s.newVolume(t, factory)
156         defer v.Teardown()
157
158         err := v.BlockWrite(context.Background(), testHash, testData)
159         if err != nil {
160                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
161         }
162
163         err = v.BlockWrite(context.Background(), testHash, testData)
164         if err != nil {
165                 t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
166         }
167 }
168
169 // Put a block and put again with different content
170 // Test is intended for only writable volumes
171 func (s *genericVolumeSuite) testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
172         s.setup(t)
173         v := s.newVolume(t, factory)
174         defer v.Teardown()
175
176         v.BlockWrite(context.Background(), testHash, testDataA)
177
178         putErr := v.BlockWrite(context.Background(), testHash, testDataB)
179         buf := &brbuffer{}
180         getErr := v.BlockRead(context.Background(), testHash, buf)
181         if putErr == nil {
182                 // Put must not return a nil error unless it has
183                 // overwritten the existing data.
184                 if buf.String() != string(testDataB) {
185                         t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, testDataB)
186                 }
187         } else {
188                 // It is permissible for Put to fail, but it must
189                 // leave us with either the original data, the new
190                 // data, or nothing at all.
191                 if getErr == nil && buf.String() != string(testDataA) && buf.String() != string(testDataB) {
192                         t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, testDataA, testDataB)
193                 }
194         }
195 }
196
197 // Put and get multiple blocks
198 // Test is intended for only writable volumes
199 func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
200         s.setup(t)
201         v := s.newVolume(t, factory)
202         defer v.Teardown()
203
204         err := v.BlockWrite(context.Background(), TestHash, TestBlock)
205         if err != nil {
206                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
207         }
208
209         err = v.BlockWrite(context.Background(), TestHash2, TestBlock2)
210         if err != nil {
211                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
212         }
213
214         err = v.BlockWrite(context.Background(), TestHash3, TestBlock3)
215         if err != nil {
216                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
217         }
218
219         buf := &brbuffer{}
220         err = v.BlockRead(context.Background(), TestHash, buf)
221         if err != nil {
222                 t.Error(err)
223         } else {
224                 if bytes.Compare(buf.Bytes(), TestBlock) != 0 {
225                         t.Errorf("Block present, but got %+q, expected %+q", buf, TestBlock)
226                 }
227         }
228
229         buf.Reset()
230         err = v.BlockRead(context.Background(), TestHash2, buf)
231         if err != nil {
232                 t.Error(err)
233         } else {
234                 if bytes.Compare(buf.Bytes(), TestBlock2) != 0 {
235                         t.Errorf("Block present, but got %+q, expected %+q", buf, TestBlock2)
236                 }
237         }
238
239         buf.Reset()
240         err = v.BlockRead(context.Background(), TestHash3, buf)
241         if err != nil {
242                 t.Error(err)
243         } else {
244                 if bytes.Compare(buf.Bytes(), TestBlock3) != 0 {
245                         t.Errorf("Block present, but to %+q, expected %+q", buf, TestBlock3)
246                 }
247         }
248 }
249
250 // testPutAndTouch checks that when applying PUT to a block that
251 // already exists, the block's modification time is updated.  Intended
252 // for only writable volumes.
253 func (s *genericVolumeSuite) testPutAndTouch(t TB, factory TestableVolumeFactory) {
254         s.setup(t)
255         v := s.newVolume(t, factory)
256         defer v.Teardown()
257
258         if err := v.BlockWrite(context.Background(), TestHash, TestBlock); err != nil {
259                 t.Error(err)
260         }
261
262         // We'll verify { t0 < threshold < t1 }, where t0 is the
263         // existing block's timestamp on disk before BlockWrite() and t1 is
264         // its timestamp after BlockWrite().
265         threshold := time.Now().Add(-time.Second)
266
267         // Set the stored block's mtime far enough in the past that we
268         // can see the difference between "timestamp didn't change"
269         // and "timestamp granularity is too low".
270         v.TouchWithDate(TestHash, time.Now().Add(-20*time.Second))
271
272         // Make sure v.Mtime() agrees the above Utime really worked.
273         if t0, err := v.Mtime(TestHash); err != nil || t0.IsZero() || !t0.Before(threshold) {
274                 t.Errorf("Setting mtime failed: %v, %v", t0, err)
275         }
276
277         // Write the same block again.
278         if err := v.BlockWrite(context.Background(), TestHash, TestBlock); err != nil {
279                 t.Error(err)
280         }
281
282         // Verify threshold < t1
283         if t1, err := v.Mtime(TestHash); err != nil {
284                 t.Error(err)
285         } else if t1.Before(threshold) {
286                 t.Errorf("t1 %v should be >= threshold %v after v.Put ", t1, threshold)
287         }
288 }
289
290 // Touching a non-existing block should result in error.
291 // Test should pass for both writable and read-only volumes
292 func (s *genericVolumeSuite) testTouchNoSuchBlock(t TB, factory TestableVolumeFactory) {
293         s.setup(t)
294         v := s.newVolume(t, factory)
295         defer v.Teardown()
296
297         if err := v.BlockTouch(TestHash); err == nil {
298                 t.Error("Expected error when attempted to touch a non-existing block")
299         }
300 }
301
302 // Invoking Mtime on a non-existing block should result in error.
303 // Test should pass for both writable and read-only volumes
304 func (s *genericVolumeSuite) testMtimeNoSuchBlock(t TB, factory TestableVolumeFactory) {
305         s.setup(t)
306         v := s.newVolume(t, factory)
307         defer v.Teardown()
308
309         if _, err := v.Mtime("12345678901234567890123456789012"); err == nil {
310                 t.Error("Expected error when updating Mtime on a non-existing block")
311         }
312 }
313
314 // Put a few blocks and invoke Index with:
315 // * no prefix
316 // * with a prefix
317 // * with no such prefix
318 // Test should pass for both writable and read-only volumes
319 func (s *genericVolumeSuite) testIndex(t TB, factory TestableVolumeFactory) {
320         s.setup(t)
321         v := s.newVolume(t, factory)
322         defer v.Teardown()
323
324         // minMtime and maxMtime are the minimum and maximum
325         // acceptable values the index can report for our test
326         // blocks. 1-second precision is acceptable.
327         minMtime := time.Now().UTC().UnixNano()
328         minMtime -= minMtime % 1e9
329
330         v.BlockWrite(context.Background(), TestHash, TestBlock)
331         v.BlockWrite(context.Background(), TestHash2, TestBlock2)
332         v.BlockWrite(context.Background(), TestHash3, TestBlock3)
333
334         maxMtime := time.Now().UTC().UnixNano()
335         if maxMtime%1e9 > 0 {
336                 maxMtime -= maxMtime % 1e9
337                 maxMtime += 1e9
338         }
339
340         // Blocks whose names aren't Keep hashes should be omitted from
341         // index
342         v.BlockWrite(context.Background(), "fffffffffnotreallyahashfffffffff", nil)
343         v.BlockWrite(context.Background(), "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", nil)
344         v.BlockWrite(context.Background(), "f0000000000000000000000000000000f", nil)
345         v.BlockWrite(context.Background(), "f00", nil)
346
347         buf := new(bytes.Buffer)
348         v.Index(context.Background(), "", buf)
349         indexRows := strings.Split(string(buf.Bytes()), "\n")
350         sort.Strings(indexRows)
351         sortedIndex := strings.Join(indexRows, "\n")
352         m := regexp.MustCompile(
353                 `^\n` + TestHash + `\+\d+ (\d+)\n` +
354                         TestHash3 + `\+\d+ \d+\n` +
355                         TestHash2 + `\+\d+ \d+$`,
356         ).FindStringSubmatch(sortedIndex)
357         if m == nil {
358                 t.Errorf("Got index %q for empty prefix", sortedIndex)
359         } else {
360                 mtime, err := strconv.ParseInt(m[1], 10, 64)
361                 if err != nil {
362                         t.Error(err)
363                 } else if mtime < minMtime || mtime > maxMtime {
364                         t.Errorf("got %d for TestHash timestamp, expected %d <= t <= %d",
365                                 mtime, minMtime, maxMtime)
366                 }
367         }
368
369         for _, prefix := range []string{"f", "f15", "f15ac"} {
370                 buf = new(bytes.Buffer)
371                 v.Index(context.Background(), prefix, buf)
372
373                 m, err := regexp.MatchString(`^`+TestHash2+`\+\d+ \d+\n$`, string(buf.Bytes()))
374                 if err != nil {
375                         t.Error(err)
376                 } else if !m {
377                         t.Errorf("Got index %q for prefix %s", string(buf.Bytes()), prefix)
378                 }
379         }
380
381         for _, prefix := range []string{"zero", "zip", "zilch"} {
382                 buf = new(bytes.Buffer)
383                 err := v.Index(context.Background(), prefix, buf)
384                 if err != nil {
385                         t.Errorf("Got error on Index with no such prefix %v", err.Error())
386                 } else if buf.Len() != 0 {
387                         t.Errorf("Expected empty list for Index with no such prefix %s", prefix)
388                 }
389         }
390 }
391
392 // Calling Delete() for a block immediately after writing it (not old enough)
393 // should neither delete the data nor return an error.
394 // Test is intended for only writable volumes
395 func (s *genericVolumeSuite) testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
396         s.setup(t)
397         s.cluster.Collections.BlobSigningTTL.Set("5m")
398         v := s.newVolume(t, factory)
399         defer v.Teardown()
400
401         v.BlockWrite(context.Background(), TestHash, TestBlock)
402
403         if err := v.BlockTrash(TestHash); err != nil {
404                 t.Error(err)
405         }
406         buf := &brbuffer{}
407         err := v.BlockRead(context.Background(), TestHash, buf)
408         if err != nil {
409                 t.Error(err)
410         } else if buf.String() != string(TestBlock) {
411                 t.Errorf("Got data %+q, expected %+q", buf.String(), TestBlock)
412         }
413 }
414
415 // Calling Delete() for a block with a timestamp older than
416 // BlobSigningTTL seconds in the past should delete the data.  Test is
417 // intended for only writable volumes
418 func (s *genericVolumeSuite) testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
419         s.setup(t)
420         s.cluster.Collections.BlobSigningTTL.Set("5m")
421         v := s.newVolume(t, factory)
422         defer v.Teardown()
423
424         v.BlockWrite(context.Background(), TestHash, TestBlock)
425         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
426
427         if err := v.BlockTrash(TestHash); err != nil {
428                 t.Error(err)
429         }
430         if err := v.BlockRead(context.Background(), TestHash, brdiscard); err == nil || !os.IsNotExist(err) {
431                 t.Errorf("os.IsNotExist(%v) should have been true", err)
432         }
433
434         _, err := v.Mtime(TestHash)
435         if err == nil || !os.IsNotExist(err) {
436                 t.Errorf("os.IsNotExist(%v) should have been true", err)
437         }
438
439         indexBuf := new(bytes.Buffer)
440         v.Index(context.Background(), "", indexBuf)
441         if strings.Contains(string(indexBuf.Bytes()), TestHash) {
442                 t.Errorf("Found trashed block in Index")
443         }
444
445         err = v.BlockTouch(TestHash)
446         if err == nil || !os.IsNotExist(err) {
447                 t.Errorf("os.IsNotExist(%v) should have been true", err)
448         }
449 }
450
451 // Calling Delete() for a block that does not exist should result in error.
452 // Test should pass for both writable and read-only volumes
453 func (s *genericVolumeSuite) testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
454         s.setup(t)
455         v := s.newVolume(t, factory)
456         defer v.Teardown()
457
458         if err := v.BlockTrash(TestHash2); err == nil {
459                 t.Errorf("Expected error when attempting to delete a non-existing block")
460         }
461 }
462
463 func getValueFrom(cv *prometheus.CounterVec, lbls prometheus.Labels) float64 {
464         c, _ := cv.GetMetricWith(lbls)
465         pb := &dto.Metric{}
466         c.Write(pb)
467         return pb.GetCounter().GetValue()
468 }
469
470 func (s *genericVolumeSuite) testMetrics(t TB, readonly bool, factory TestableVolumeFactory) {
471         var err error
472
473         s.setup(t)
474         v := s.newVolume(t, factory)
475         defer v.Teardown()
476
477         opsC, _, ioC := s.metrics.getCounterVecsFor(prometheus.Labels{"device_id": v.DeviceID()})
478
479         if ioC == nil {
480                 t.Error("ioBytes CounterVec is nil")
481                 return
482         }
483
484         if getValueFrom(ioC, prometheus.Labels{"direction": "out"})+
485                 getValueFrom(ioC, prometheus.Labels{"direction": "in"}) > 0 {
486                 t.Error("ioBytes counter should be zero")
487         }
488
489         if opsC == nil {
490                 t.Error("opsCounter CounterVec is nil")
491                 return
492         }
493
494         var c, writeOpCounter, readOpCounter float64
495
496         readOpType, writeOpType := v.ReadWriteOperationLabelValues()
497         writeOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
498         readOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
499
500         // Test Put if volume is writable
501         if !readonly {
502                 err = v.BlockWrite(context.Background(), TestHash, TestBlock)
503                 if err != nil {
504                         t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
505                 }
506                 // Check that the write operations counter increased
507                 c = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
508                 if c <= writeOpCounter {
509                         t.Error("Operation(s) not counted on Put")
510                 }
511                 // Check that bytes counter is > 0
512                 if getValueFrom(ioC, prometheus.Labels{"direction": "out"}) == 0 {
513                         t.Error("ioBytes{direction=out} counter shouldn't be zero")
514                 }
515         } else {
516                 v.BlockWrite(context.Background(), TestHash, TestBlock)
517         }
518
519         err = v.BlockRead(context.Background(), TestHash, brdiscard)
520         if err != nil {
521                 t.Error(err)
522         }
523
524         // Check that the operations counter increased
525         c = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
526         if c <= readOpCounter {
527                 t.Error("Operation(s) not counted on Get")
528         }
529         // Check that the bytes "in" counter is > 0
530         if getValueFrom(ioC, prometheus.Labels{"direction": "in"}) == 0 {
531                 t.Error("ioBytes{direction=in} counter shouldn't be zero")
532         }
533 }
534
535 // Launch concurrent Gets
536 // Test should pass for both writable and read-only volumes
537 func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFactory) {
538         s.setup(t)
539         v := s.newVolume(t, factory)
540         defer v.Teardown()
541
542         v.BlockWrite(context.Background(), TestHash, TestBlock)
543         v.BlockWrite(context.Background(), TestHash2, TestBlock2)
544         v.BlockWrite(context.Background(), TestHash3, TestBlock3)
545
546         sem := make(chan int)
547         go func() {
548                 buf := &brbuffer{}
549                 err := v.BlockRead(context.Background(), TestHash, buf)
550                 if err != nil {
551                         t.Errorf("err1: %v", err)
552                 }
553                 if buf.String() != string(TestBlock) {
554                         t.Errorf("buf should be %s, is %s", TestBlock, buf)
555                 }
556                 sem <- 1
557         }()
558
559         go func() {
560                 buf := &brbuffer{}
561                 err := v.BlockRead(context.Background(), TestHash2, buf)
562                 if err != nil {
563                         t.Errorf("err2: %v", err)
564                 }
565                 if buf.String() != string(TestBlock2) {
566                         t.Errorf("buf should be %s, is %s", TestBlock2, buf)
567                 }
568                 sem <- 1
569         }()
570
571         go func() {
572                 buf := &brbuffer{}
573                 err := v.BlockRead(context.Background(), TestHash3, buf)
574                 if err != nil {
575                         t.Errorf("err3: %v", err)
576                 }
577                 if buf.String() != string(TestBlock3) {
578                         t.Errorf("buf should be %s, is %s", TestBlock3, buf)
579                 }
580                 sem <- 1
581         }()
582
583         // Wait for all goroutines to finish
584         for done := 0; done < 3; done++ {
585                 <-sem
586         }
587 }
588
589 // Launch concurrent Puts
590 // Test is intended for only writable volumes
591 func (s *genericVolumeSuite) testPutConcurrent(t TB, factory TestableVolumeFactory) {
592         s.setup(t)
593         v := s.newVolume(t, factory)
594         defer v.Teardown()
595
596         blks := []struct {
597                 hash string
598                 data []byte
599         }{
600                 {hash: TestHash, data: TestBlock},
601                 {hash: TestHash2, data: TestBlock2},
602                 {hash: TestHash3, data: TestBlock3},
603         }
604
605         var wg sync.WaitGroup
606         for _, blk := range blks {
607                 blk := blk
608                 wg.Add(1)
609                 go func() {
610                         defer wg.Done()
611                         err := v.BlockWrite(context.Background(), blk.hash, blk.data)
612                         if err != nil {
613                                 t.Errorf("%s: %v", blk.hash, err)
614                         }
615                 }()
616         }
617         wg.Wait()
618
619         // Check that we actually wrote the blocks.
620         for _, blk := range blks {
621                 buf := &brbuffer{}
622                 err := v.BlockRead(context.Background(), blk.hash, buf)
623                 if err != nil {
624                         t.Errorf("get %s: %v", blk.hash, err)
625                 } else if buf.String() != string(blk.data) {
626                         t.Errorf("get %s: expected %s, got %s", blk.hash, blk.data, buf)
627                 }
628         }
629 }
630
631 // Write and read back a full size block
632 func (s *genericVolumeSuite) testPutFullBlock(t TB, factory TestableVolumeFactory) {
633         s.setup(t)
634         v := s.newVolume(t, factory)
635         defer v.Teardown()
636
637         wdata := make([]byte, BlockSize)
638         wdata[0] = 'a'
639         wdata[BlockSize-1] = 'z'
640         hash := fmt.Sprintf("%x", md5.Sum(wdata))
641         err := v.BlockWrite(context.Background(), hash, wdata)
642         if err != nil {
643                 t.Error(err)
644         }
645
646         buf := &brbuffer{}
647         err = v.BlockRead(context.Background(), hash, buf)
648         if err != nil {
649                 t.Error(err)
650         }
651         if buf.String() != string(wdata) {
652                 t.Errorf("buf (len %d) != wdata (len %d)", buf.Len(), len(wdata))
653         }
654 }
655
656 // With BlobTrashLifetime != 0, perform:
657 // Trash an old block - which either raises ErrNotImplemented or succeeds
658 // Untrash -  which either raises ErrNotImplemented or succeeds
659 // Get - which must succeed
660 func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory TestableVolumeFactory) {
661         s.setup(t)
662         s.cluster.Collections.BlobTrashLifetime.Set("1h")
663         v := s.newVolume(t, factory)
664         defer v.Teardown()
665
666         // put block and backdate it
667         v.BlockWrite(context.Background(), TestHash, TestBlock)
668         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
669
670         buf := &brbuffer{}
671         err := v.BlockRead(context.Background(), TestHash, buf)
672         if err != nil {
673                 t.Error(err)
674         }
675         if buf.String() != string(TestBlock) {
676                 t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
677         }
678
679         // Trash
680         err = v.BlockTrash(TestHash)
681         if err != nil {
682                 t.Error(err)
683                 return
684         }
685         buf.Reset()
686         err = v.BlockRead(context.Background(), TestHash, buf)
687         if err == nil || !os.IsNotExist(err) {
688                 t.Errorf("os.IsNotExist(%v) should have been true", err)
689         }
690
691         // Untrash
692         err = v.BlockUntrash(TestHash)
693         if err != nil {
694                 t.Error(err)
695         }
696
697         // Get the block - after trash and untrash sequence
698         buf.Reset()
699         err = v.BlockRead(context.Background(), TestHash, buf)
700         if err != nil {
701                 t.Error(err)
702         }
703         if buf.String() != string(TestBlock) {
704                 t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
705         }
706 }
707
708 func (s *genericVolumeSuite) testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
709         s.setup(t)
710         v := s.newVolume(t, factory)
711         defer v.Teardown()
712
713         checkGet := func() error {
714                 buf := &brbuffer{}
715                 err := v.BlockRead(context.Background(), TestHash, buf)
716                 if err != nil {
717                         return err
718                 }
719                 if buf.String() != string(TestBlock) {
720                         t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
721                 }
722
723                 _, err = v.Mtime(TestHash)
724                 if err != nil {
725                         return err
726                 }
727
728                 indexBuf := new(bytes.Buffer)
729                 v.Index(context.Background(), "", indexBuf)
730                 if !strings.Contains(string(indexBuf.Bytes()), TestHash) {
731                         return os.ErrNotExist
732                 }
733
734                 return nil
735         }
736
737         // First set: EmptyTrash before reaching the trash deadline.
738
739         s.cluster.Collections.BlobTrashLifetime.Set("1h")
740
741         v.BlockWrite(context.Background(), TestHash, TestBlock)
742         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
743
744         err := checkGet()
745         if err != nil {
746                 t.Error(err)
747         }
748
749         // Trash the block
750         err = v.BlockTrash(TestHash)
751         if err != nil {
752                 t.Error(err)
753         }
754
755         err = checkGet()
756         if err == nil || !os.IsNotExist(err) {
757                 t.Errorf("os.IsNotExist(%v) should have been true", err)
758         }
759
760         err = v.BlockTouch(TestHash)
761         if err == nil || !os.IsNotExist(err) {
762                 t.Errorf("os.IsNotExist(%v) should have been true", err)
763         }
764
765         v.EmptyTrash()
766
767         // Even after emptying the trash, we can untrash our block
768         // because the deadline hasn't been reached.
769         err = v.BlockUntrash(TestHash)
770         if err != nil {
771                 t.Error(err)
772         }
773
774         err = checkGet()
775         if err != nil {
776                 t.Error(err)
777         }
778
779         err = v.BlockTouch(TestHash)
780         if err != nil {
781                 t.Error(err)
782         }
783
784         // Because we Touch'ed, need to backdate again for next set of tests
785         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
786
787         // If the only block in the trash has already been untrashed,
788         // most volumes will fail a subsequent Untrash with a 404, but
789         // it's also acceptable for Untrash to succeed.
790         err = v.BlockUntrash(TestHash)
791         if err != nil && !os.IsNotExist(err) {
792                 t.Errorf("Expected success or os.IsNotExist(), but got: %v", err)
793         }
794
795         // The additional Untrash should not interfere with our
796         // already-untrashed copy.
797         err = checkGet()
798         if err != nil {
799                 t.Error(err)
800         }
801
802         // Untrash might have updated the timestamp, so backdate again
803         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
804
805         // Second set: EmptyTrash after the trash deadline has passed.
806
807         s.cluster.Collections.BlobTrashLifetime.Set("1ns")
808
809         err = v.BlockTrash(TestHash)
810         if err != nil {
811                 t.Error(err)
812         }
813         err = checkGet()
814         if err == nil || !os.IsNotExist(err) {
815                 t.Errorf("os.IsNotExist(%v) should have been true", err)
816         }
817
818         // Even though 1ns has passed, we can untrash because we
819         // haven't called EmptyTrash yet.
820         err = v.BlockUntrash(TestHash)
821         if err != nil {
822                 t.Error(err)
823         }
824         err = checkGet()
825         if err != nil {
826                 t.Error(err)
827         }
828
829         // Trash it again, and this time call EmptyTrash so it really
830         // goes away.
831         // (In Azure volumes, un/trash changes Mtime, so first backdate again)
832         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
833         _ = v.BlockTrash(TestHash)
834         err = checkGet()
835         if err == nil || !os.IsNotExist(err) {
836                 t.Errorf("os.IsNotExist(%v) should have been true", err)
837         }
838         v.EmptyTrash()
839
840         // Untrash won't find it
841         err = v.BlockUntrash(TestHash)
842         if err == nil || !os.IsNotExist(err) {
843                 t.Errorf("os.IsNotExist(%v) should have been true", err)
844         }
845
846         // Get block won't find it
847         err = checkGet()
848         if err == nil || !os.IsNotExist(err) {
849                 t.Errorf("os.IsNotExist(%v) should have been true", err)
850         }
851
852         // Third set: If the same data block gets written again after
853         // being trashed, and then the trash gets emptied, the newer
854         // un-trashed copy doesn't get deleted along with it.
855
856         v.BlockWrite(context.Background(), TestHash, TestBlock)
857         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
858
859         s.cluster.Collections.BlobTrashLifetime.Set("1ns")
860         err = v.BlockTrash(TestHash)
861         if err != nil {
862                 t.Error(err)
863         }
864         err = checkGet()
865         if err == nil || !os.IsNotExist(err) {
866                 t.Errorf("os.IsNotExist(%v) should have been true", err)
867         }
868
869         v.BlockWrite(context.Background(), TestHash, TestBlock)
870         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
871
872         // EmptyTrash should not delete the untrashed copy.
873         v.EmptyTrash()
874         err = checkGet()
875         if err != nil {
876                 t.Error(err)
877         }
878
879         // Fourth set: If the same data block gets trashed twice with
880         // different deadlines A and C, and then the trash is emptied
881         // at intermediate time B (A < B < C), it is still possible to
882         // untrash the block whose deadline is "C".
883
884         v.BlockWrite(context.Background(), TestHash, TestBlock)
885         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
886
887         s.cluster.Collections.BlobTrashLifetime.Set("1ns")
888         err = v.BlockTrash(TestHash)
889         if err != nil {
890                 t.Error(err)
891         }
892
893         v.BlockWrite(context.Background(), TestHash, TestBlock)
894         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
895
896         s.cluster.Collections.BlobTrashLifetime.Set("1h")
897         err = v.BlockTrash(TestHash)
898         if err != nil {
899                 t.Error(err)
900         }
901
902         // EmptyTrash should not prevent us from recovering the
903         // time.Hour ("C") trash
904         v.EmptyTrash()
905         err = v.BlockUntrash(TestHash)
906         if err != nil {
907                 t.Error(err)
908         }
909         err = checkGet()
910         if err != nil {
911                 t.Error(err)
912         }
913 }