21804124316fe2ea9421c25eeeec14c9aab190aa
[arvados.git] / services / keepstore / volume_generic_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "os"
13         "regexp"
14         "sort"
15         "strconv"
16         "strings"
17         "time"
18
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/arvadostest"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "github.com/prometheus/client_golang/prometheus"
23         dto "github.com/prometheus/client_model/go"
24         "github.com/sirupsen/logrus"
25 )
26
27 type TB interface {
28         Error(args ...interface{})
29         Errorf(format string, args ...interface{})
30         Fail()
31         FailNow()
32         Failed() bool
33         Fatal(args ...interface{})
34         Fatalf(format string, args ...interface{})
35         Log(args ...interface{})
36         Logf(format string, args ...interface{})
37 }
38
39 // A TestableVolumeFactory returns a new TestableVolume. The factory
40 // function, and the TestableVolume it returns, can use "t" to write
41 // logs, fail the current test, etc.
42 type TestableVolumeFactory func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume
43
44 // DoGenericVolumeTests runs a set of tests that every TestableVolume
45 // is expected to pass. It calls factory to create a new TestableVolume
46 // for each test case, to avoid leaking state between tests.
47 func DoGenericVolumeTests(t TB, readonly bool, factory TestableVolumeFactory) {
48         var s genericVolumeSuite
49         s.volume.ReadOnly = readonly
50
51         s.testGet(t, factory)
52         s.testGetNoSuchBlock(t, factory)
53
54         s.testCompareNonexistent(t, factory)
55         s.testCompareSameContent(t, factory, TestHash, TestBlock)
56         s.testCompareSameContent(t, factory, EmptyHash, EmptyBlock)
57         s.testCompareWithCollision(t, factory, TestHash, TestBlock, []byte("baddata"))
58         s.testCompareWithCollision(t, factory, TestHash, TestBlock, EmptyBlock)
59         s.testCompareWithCollision(t, factory, EmptyHash, EmptyBlock, TestBlock)
60         s.testCompareWithCorruptStoredData(t, factory, TestHash, TestBlock, []byte("baddata"))
61         s.testCompareWithCorruptStoredData(t, factory, TestHash, TestBlock, EmptyBlock)
62         s.testCompareWithCorruptStoredData(t, factory, EmptyHash, EmptyBlock, []byte("baddata"))
63
64         if !readonly {
65                 s.testPutBlockWithSameContent(t, factory, TestHash, TestBlock)
66                 s.testPutBlockWithSameContent(t, factory, EmptyHash, EmptyBlock)
67                 s.testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], arvadostest.MD5CollisionData[1])
68                 s.testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, EmptyBlock, arvadostest.MD5CollisionData[0])
69                 s.testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], EmptyBlock)
70                 s.testPutBlockWithDifferentContent(t, factory, EmptyHash, EmptyBlock, arvadostest.MD5CollisionData[0])
71                 s.testPutMultipleBlocks(t, factory)
72
73                 s.testPutAndTouch(t, factory)
74         }
75         s.testTouchNoSuchBlock(t, factory)
76
77         s.testMtimeNoSuchBlock(t, factory)
78
79         s.testIndexTo(t, factory)
80
81         if !readonly {
82                 s.testDeleteNewBlock(t, factory)
83                 s.testDeleteOldBlock(t, factory)
84         }
85         s.testDeleteNoSuchBlock(t, factory)
86
87         s.testStatus(t, factory)
88
89         s.testMetrics(t, readonly, factory)
90
91         s.testString(t, factory)
92
93         if readonly {
94                 s.testUpdateReadOnly(t, factory)
95         }
96
97         s.testGetConcurrent(t, factory)
98         if !readonly {
99                 s.testPutConcurrent(t, factory)
100
101                 s.testPutFullBlock(t, factory)
102         }
103
104         s.testTrashUntrash(t, readonly, factory)
105         s.testTrashEmptyTrashUntrash(t, factory)
106 }
107
108 type genericVolumeSuite struct {
109         cluster  *arvados.Cluster
110         volume   arvados.Volume
111         logger   logrus.FieldLogger
112         metrics  *volumeMetricsVecs
113         registry *prometheus.Registry
114 }
115
116 func (s *genericVolumeSuite) setup(t TB) {
117         s.cluster = testCluster(t)
118         s.logger = ctxlog.TestLogger(t)
119         s.registry = prometheus.NewRegistry()
120         s.metrics = newVolumeMetricsVecs(s.registry)
121 }
122
123 func (s *genericVolumeSuite) newVolume(t TB, factory TestableVolumeFactory) TestableVolume {
124         return factory(t, s.cluster, s.volume, s.logger, s.metrics)
125 }
126
127 // Put a test block, get it and verify content
128 // Test should pass for both writable and read-only volumes
129 func (s *genericVolumeSuite) testGet(t TB, factory TestableVolumeFactory) {
130         s.setup(t)
131         v := s.newVolume(t, factory)
132         defer v.Teardown()
133
134         v.PutRaw(TestHash, TestBlock)
135
136         buf := make([]byte, BlockSize)
137         n, err := v.Get(context.Background(), TestHash, buf)
138         if err != nil {
139                 t.Fatal(err)
140         }
141
142         if bytes.Compare(buf[:n], TestBlock) != 0 {
143                 t.Errorf("expected %s, got %s", string(TestBlock), string(buf))
144         }
145 }
146
147 // Invoke get on a block that does not exist in volume; should result in error
148 // Test should pass for both writable and read-only volumes
149 func (s *genericVolumeSuite) testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
150         s.setup(t)
151         v := s.newVolume(t, factory)
152         defer v.Teardown()
153
154         buf := make([]byte, BlockSize)
155         if _, err := v.Get(context.Background(), TestHash2, buf); err == nil {
156                 t.Errorf("Expected error while getting non-existing block %v", TestHash2)
157         }
158 }
159
160 // Compare() should return os.ErrNotExist if the block does not exist.
161 // Otherwise, writing new data causes CompareAndTouch() to generate
162 // error logs even though everything is working fine.
163 func (s *genericVolumeSuite) testCompareNonexistent(t TB, factory TestableVolumeFactory) {
164         s.setup(t)
165         v := s.newVolume(t, factory)
166         defer v.Teardown()
167
168         err := v.Compare(context.Background(), TestHash, TestBlock)
169         if err != os.ErrNotExist {
170                 t.Errorf("Got err %T %q, expected os.ErrNotExist", err, err)
171         }
172 }
173
174 // Put a test block and compare the locator with same content
175 // Test should pass for both writable and read-only volumes
176 func (s *genericVolumeSuite) testCompareSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
177         s.setup(t)
178         v := s.newVolume(t, factory)
179         defer v.Teardown()
180
181         v.PutRaw(testHash, testData)
182
183         // Compare the block locator with same content
184         err := v.Compare(context.Background(), testHash, testData)
185         if err != nil {
186                 t.Errorf("Got err %q, expected nil", err)
187         }
188 }
189
190 // Test behavior of Compare() when stored data matches expected
191 // checksum but differs from new data we need to store. Requires
192 // testHash = md5(testDataA).
193 //
194 // Test should pass for both writable and read-only volumes
195 func (s *genericVolumeSuite) testCompareWithCollision(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
196         s.setup(t)
197         v := s.newVolume(t, factory)
198         defer v.Teardown()
199
200         v.PutRaw(testHash, testDataA)
201
202         // Compare the block locator with different content; collision
203         err := v.Compare(context.Background(), TestHash, testDataB)
204         if err == nil {
205                 t.Errorf("Got err nil, expected error due to collision")
206         }
207 }
208
209 // Test behavior of Compare() when stored data has become
210 // corrupted. Requires testHash = md5(testDataA) != md5(testDataB).
211 //
212 // Test should pass for both writable and read-only volumes
213 func (s *genericVolumeSuite) testCompareWithCorruptStoredData(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
214         s.setup(t)
215         v := s.newVolume(t, factory)
216         defer v.Teardown()
217
218         v.PutRaw(TestHash, testDataB)
219
220         err := v.Compare(context.Background(), testHash, testDataA)
221         if err == nil || err == CollisionError {
222                 t.Errorf("Got err %+v, expected non-collision error", err)
223         }
224 }
225
226 // Put a block and put again with same content
227 // Test is intended for only writable volumes
228 func (s *genericVolumeSuite) testPutBlockWithSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
229         s.setup(t)
230         v := s.newVolume(t, factory)
231         defer v.Teardown()
232
233         err := v.Put(context.Background(), testHash, testData)
234         if err != nil {
235                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
236         }
237
238         err = v.Put(context.Background(), testHash, testData)
239         if err != nil {
240                 t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
241         }
242 }
243
244 // Put a block and put again with different content
245 // Test is intended for only writable volumes
246 func (s *genericVolumeSuite) testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
247         s.setup(t)
248         v := s.newVolume(t, factory)
249         defer v.Teardown()
250
251         v.PutRaw(testHash, testDataA)
252
253         putErr := v.Put(context.Background(), testHash, testDataB)
254         buf := make([]byte, BlockSize)
255         n, getErr := v.Get(context.Background(), testHash, buf)
256         if putErr == nil {
257                 // Put must not return a nil error unless it has
258                 // overwritten the existing data.
259                 if bytes.Compare(buf[:n], testDataB) != 0 {
260                         t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf[:n], testDataB)
261                 }
262         } else {
263                 // It is permissible for Put to fail, but it must
264                 // leave us with either the original data, the new
265                 // data, or nothing at all.
266                 if getErr == nil && bytes.Compare(buf[:n], testDataA) != 0 && bytes.Compare(buf[:n], testDataB) != 0 {
267                         t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf[:n], testDataA, testDataB)
268                 }
269         }
270 }
271
272 // Put and get multiple blocks
273 // Test is intended for only writable volumes
274 func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
275         s.setup(t)
276         v := s.newVolume(t, factory)
277         defer v.Teardown()
278
279         err := v.Put(context.Background(), TestHash, TestBlock)
280         if err != nil {
281                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
282         }
283
284         err = v.Put(context.Background(), TestHash2, TestBlock2)
285         if err != nil {
286                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
287         }
288
289         err = v.Put(context.Background(), TestHash3, TestBlock3)
290         if err != nil {
291                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
292         }
293
294         data := make([]byte, BlockSize)
295         n, err := v.Get(context.Background(), TestHash, data)
296         if err != nil {
297                 t.Error(err)
298         } else {
299                 if bytes.Compare(data[:n], TestBlock) != 0 {
300                         t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock)
301                 }
302         }
303
304         n, err = v.Get(context.Background(), TestHash2, data)
305         if err != nil {
306                 t.Error(err)
307         } else {
308                 if bytes.Compare(data[:n], TestBlock2) != 0 {
309                         t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock2)
310                 }
311         }
312
313         n, err = v.Get(context.Background(), TestHash3, data)
314         if err != nil {
315                 t.Error(err)
316         } else {
317                 if bytes.Compare(data[:n], TestBlock3) != 0 {
318                         t.Errorf("Block present, but to %+q, expected %+q", data[:n], TestBlock3)
319                 }
320         }
321 }
322
323 // testPutAndTouch checks that when applying PUT to a block that
324 // already exists, the block's modification time is updated.  Intended
325 // for only writable volumes.
326 func (s *genericVolumeSuite) testPutAndTouch(t TB, factory TestableVolumeFactory) {
327         s.setup(t)
328         v := s.newVolume(t, factory)
329         defer v.Teardown()
330
331         if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
332                 t.Error(err)
333         }
334
335         // We'll verify { t0 < threshold < t1 }, where t0 is the
336         // existing block's timestamp on disk before Put() and t1 is
337         // its timestamp after Put().
338         threshold := time.Now().Add(-time.Second)
339
340         // Set the stored block's mtime far enough in the past that we
341         // can see the difference between "timestamp didn't change"
342         // and "timestamp granularity is too low".
343         v.TouchWithDate(TestHash, time.Now().Add(-20*time.Second))
344
345         // Make sure v.Mtime() agrees the above Utime really worked.
346         if t0, err := v.Mtime(TestHash); err != nil || t0.IsZero() || !t0.Before(threshold) {
347                 t.Errorf("Setting mtime failed: %v, %v", t0, err)
348         }
349
350         // Write the same block again.
351         if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
352                 t.Error(err)
353         }
354
355         // Verify threshold < t1
356         if t1, err := v.Mtime(TestHash); err != nil {
357                 t.Error(err)
358         } else if t1.Before(threshold) {
359                 t.Errorf("t1 %v should be >= threshold %v after v.Put ", t1, threshold)
360         }
361 }
362
363 // Touching a non-existing block should result in error.
364 // Test should pass for both writable and read-only volumes
365 func (s *genericVolumeSuite) testTouchNoSuchBlock(t TB, factory TestableVolumeFactory) {
366         s.setup(t)
367         v := s.newVolume(t, factory)
368         defer v.Teardown()
369
370         if err := v.Touch(TestHash); err == nil {
371                 t.Error("Expected error when attempted to touch a non-existing block")
372         }
373 }
374
375 // Invoking Mtime on a non-existing block should result in error.
376 // Test should pass for both writable and read-only volumes
377 func (s *genericVolumeSuite) testMtimeNoSuchBlock(t TB, factory TestableVolumeFactory) {
378         s.setup(t)
379         v := s.newVolume(t, factory)
380         defer v.Teardown()
381
382         if _, err := v.Mtime("12345678901234567890123456789012"); err == nil {
383                 t.Error("Expected error when updating Mtime on a non-existing block")
384         }
385 }
386
387 // Put a few blocks and invoke IndexTo with:
388 // * no prefix
389 // * with a prefix
390 // * with no such prefix
391 // Test should pass for both writable and read-only volumes
392 func (s *genericVolumeSuite) testIndexTo(t TB, factory TestableVolumeFactory) {
393         s.setup(t)
394         v := s.newVolume(t, factory)
395         defer v.Teardown()
396
397         // minMtime and maxMtime are the minimum and maximum
398         // acceptable values the index can report for our test
399         // blocks. 1-second precision is acceptable.
400         minMtime := time.Now().UTC().UnixNano()
401         minMtime -= minMtime % 1e9
402
403         v.PutRaw(TestHash, TestBlock)
404         v.PutRaw(TestHash2, TestBlock2)
405         v.PutRaw(TestHash3, TestBlock3)
406
407         maxMtime := time.Now().UTC().UnixNano()
408         if maxMtime%1e9 > 0 {
409                 maxMtime -= maxMtime % 1e9
410                 maxMtime += 1e9
411         }
412
413         // Blocks whose names aren't Keep hashes should be omitted from
414         // index
415         v.PutRaw("fffffffffnotreallyahashfffffffff", nil)
416         v.PutRaw("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", nil)
417         v.PutRaw("f0000000000000000000000000000000f", nil)
418         v.PutRaw("f00", nil)
419
420         buf := new(bytes.Buffer)
421         v.IndexTo("", buf)
422         indexRows := strings.Split(string(buf.Bytes()), "\n")
423         sort.Strings(indexRows)
424         sortedIndex := strings.Join(indexRows, "\n")
425         m := regexp.MustCompile(
426                 `^\n` + TestHash + `\+\d+ (\d+)\n` +
427                         TestHash3 + `\+\d+ \d+\n` +
428                         TestHash2 + `\+\d+ \d+$`,
429         ).FindStringSubmatch(sortedIndex)
430         if m == nil {
431                 t.Errorf("Got index %q for empty prefix", sortedIndex)
432         } else {
433                 mtime, err := strconv.ParseInt(m[1], 10, 64)
434                 if err != nil {
435                         t.Error(err)
436                 } else if mtime < minMtime || mtime > maxMtime {
437                         t.Errorf("got %d for TestHash timestamp, expected %d <= t <= %d",
438                                 mtime, minMtime, maxMtime)
439                 }
440         }
441
442         for _, prefix := range []string{"f", "f15", "f15ac"} {
443                 buf = new(bytes.Buffer)
444                 v.IndexTo(prefix, buf)
445
446                 m, err := regexp.MatchString(`^`+TestHash2+`\+\d+ \d+\n$`, string(buf.Bytes()))
447                 if err != nil {
448                         t.Error(err)
449                 } else if !m {
450                         t.Errorf("Got index %q for prefix %s", string(buf.Bytes()), prefix)
451                 }
452         }
453
454         for _, prefix := range []string{"zero", "zip", "zilch"} {
455                 buf = new(bytes.Buffer)
456                 err := v.IndexTo(prefix, buf)
457                 if err != nil {
458                         t.Errorf("Got error on IndexTo with no such prefix %v", err.Error())
459                 } else if buf.Len() != 0 {
460                         t.Errorf("Expected empty list for IndexTo with no such prefix %s", prefix)
461                 }
462         }
463 }
464
465 // Calling Delete() for a block immediately after writing it (not old enough)
466 // should neither delete the data nor return an error.
467 // Test is intended for only writable volumes
468 func (s *genericVolumeSuite) testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
469         s.setup(t)
470         s.cluster.Collections.BlobSigningTTL.Set("5m")
471         v := s.newVolume(t, factory)
472         defer v.Teardown()
473
474         v.Put(context.Background(), TestHash, TestBlock)
475
476         if err := v.Trash(TestHash); err != nil {
477                 t.Error(err)
478         }
479         data := make([]byte, BlockSize)
480         n, err := v.Get(context.Background(), TestHash, data)
481         if err != nil {
482                 t.Error(err)
483         } else if bytes.Compare(data[:n], TestBlock) != 0 {
484                 t.Errorf("Got data %+q, expected %+q", data[:n], TestBlock)
485         }
486 }
487
488 // Calling Delete() for a block with a timestamp older than
489 // BlobSigningTTL seconds in the past should delete the data.  Test is
490 // intended for only writable volumes
491 func (s *genericVolumeSuite) testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
492         s.setup(t)
493         s.cluster.Collections.BlobSigningTTL.Set("5m")
494         v := s.newVolume(t, factory)
495         defer v.Teardown()
496
497         v.Put(context.Background(), TestHash, TestBlock)
498         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
499
500         if err := v.Trash(TestHash); err != nil {
501                 t.Error(err)
502         }
503         data := make([]byte, BlockSize)
504         if _, err := v.Get(context.Background(), TestHash, data); err == nil || !os.IsNotExist(err) {
505                 t.Errorf("os.IsNotExist(%v) should have been true", err)
506         }
507
508         _, err := v.Mtime(TestHash)
509         if err == nil || !os.IsNotExist(err) {
510                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
511         }
512
513         err = v.Compare(context.Background(), TestHash, TestBlock)
514         if err == nil || !os.IsNotExist(err) {
515                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
516         }
517
518         indexBuf := new(bytes.Buffer)
519         v.IndexTo("", indexBuf)
520         if strings.Contains(string(indexBuf.Bytes()), TestHash) {
521                 t.Fatalf("Found trashed block in IndexTo")
522         }
523
524         err = v.Touch(TestHash)
525         if err == nil || !os.IsNotExist(err) {
526                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
527         }
528 }
529
530 // Calling Delete() for a block that does not exist should result in error.
531 // Test should pass for both writable and read-only volumes
532 func (s *genericVolumeSuite) testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
533         s.setup(t)
534         v := s.newVolume(t, factory)
535         defer v.Teardown()
536
537         if err := v.Trash(TestHash2); err == nil {
538                 t.Errorf("Expected error when attempting to delete a non-existing block")
539         }
540 }
541
542 // Invoke Status and verify that VolumeStatus is returned
543 // Test should pass for both writable and read-only volumes
544 func (s *genericVolumeSuite) testStatus(t TB, factory TestableVolumeFactory) {
545         s.setup(t)
546         v := s.newVolume(t, factory)
547         defer v.Teardown()
548
549         // Get node status and make a basic sanity check.
550         status := v.Status()
551         if status.DeviceNum == 0 {
552                 t.Errorf("uninitialized device_num in %v", status)
553         }
554
555         if status.BytesFree == 0 {
556                 t.Errorf("uninitialized bytes_free in %v", status)
557         }
558
559         if status.BytesUsed == 0 {
560                 t.Errorf("uninitialized bytes_used in %v", status)
561         }
562 }
563
564 func getValueFrom(cv *prometheus.CounterVec, lbls prometheus.Labels) float64 {
565         c, _ := cv.GetMetricWith(lbls)
566         pb := &dto.Metric{}
567         c.Write(pb)
568         return pb.GetCounter().GetValue()
569 }
570
571 func (s *genericVolumeSuite) testMetrics(t TB, readonly bool, factory TestableVolumeFactory) {
572         var err error
573
574         s.setup(t)
575         v := s.newVolume(t, factory)
576         defer v.Teardown()
577
578         opsC, _, ioC := s.metrics.getCounterVecsFor(prometheus.Labels{"device_id": v.GetDeviceID()})
579
580         if ioC == nil {
581                 t.Error("ioBytes CounterVec is nil")
582                 return
583         }
584
585         if getValueFrom(ioC, prometheus.Labels{"direction": "out"})+
586                 getValueFrom(ioC, prometheus.Labels{"direction": "in"}) > 0 {
587                 t.Error("ioBytes counter should be zero")
588         }
589
590         if opsC == nil {
591                 t.Error("opsCounter CounterVec is nil")
592                 return
593         }
594
595         var c, writeOpCounter, readOpCounter float64
596
597         readOpType, writeOpType := v.ReadWriteOperationLabelValues()
598         writeOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
599         readOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
600
601         // Test Put if volume is writable
602         if !readonly {
603                 err = v.Put(context.Background(), TestHash, TestBlock)
604                 if err != nil {
605                         t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
606                 }
607                 // Check that the write operations counter increased
608                 c = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
609                 if c <= writeOpCounter {
610                         t.Error("Operation(s) not counted on Put")
611                 }
612                 // Check that bytes counter is > 0
613                 if getValueFrom(ioC, prometheus.Labels{"direction": "out"}) == 0 {
614                         t.Error("ioBytes{direction=out} counter shouldn't be zero")
615                 }
616         } else {
617                 v.PutRaw(TestHash, TestBlock)
618         }
619
620         buf := make([]byte, BlockSize)
621         _, err = v.Get(context.Background(), TestHash, buf)
622         if err != nil {
623                 t.Fatal(err)
624         }
625
626         // Check that the operations counter increased
627         c = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
628         if c <= readOpCounter {
629                 t.Error("Operation(s) not counted on Get")
630         }
631         // Check that the bytes "in" counter is > 0
632         if getValueFrom(ioC, prometheus.Labels{"direction": "in"}) == 0 {
633                 t.Error("ioBytes{direction=in} counter shouldn't be zero")
634         }
635 }
636
637 // Invoke String for the volume; expect non-empty result
638 // Test should pass for both writable and read-only volumes
639 func (s *genericVolumeSuite) testString(t TB, factory TestableVolumeFactory) {
640         s.setup(t)
641         v := s.newVolume(t, factory)
642         defer v.Teardown()
643
644         if id := v.String(); len(id) == 0 {
645                 t.Error("Got empty string for v.String()")
646         }
647 }
648
649 // Putting, updating, touching, and deleting blocks from a read-only volume result in error.
650 // Test is intended for only read-only volumes
651 func (s *genericVolumeSuite) testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
652         s.setup(t)
653         v := s.newVolume(t, factory)
654         defer v.Teardown()
655
656         v.PutRaw(TestHash, TestBlock)
657         buf := make([]byte, BlockSize)
658
659         // Get from read-only volume should succeed
660         _, err := v.Get(context.Background(), TestHash, buf)
661         if err != nil {
662                 t.Errorf("got err %v, expected nil", err)
663         }
664
665         // Put a new block to read-only volume should result in error
666         err = v.Put(context.Background(), TestHash2, TestBlock2)
667         if err == nil {
668                 t.Errorf("Expected error when putting block in a read-only volume")
669         }
670         _, err = v.Get(context.Background(), TestHash2, buf)
671         if err == nil {
672                 t.Errorf("Expected error when getting block whose put in read-only volume failed")
673         }
674
675         // Touch a block in read-only volume should result in error
676         err = v.Touch(TestHash)
677         if err == nil {
678                 t.Errorf("Expected error when touching block in a read-only volume")
679         }
680
681         // Delete a block from a read-only volume should result in error
682         err = v.Trash(TestHash)
683         if err == nil {
684                 t.Errorf("Expected error when deleting block from a read-only volume")
685         }
686
687         // Overwriting an existing block in read-only volume should result in error
688         err = v.Put(context.Background(), TestHash, TestBlock)
689         if err == nil {
690                 t.Errorf("Expected error when putting block in a read-only volume")
691         }
692 }
693
694 // Launch concurrent Gets
695 // Test should pass for both writable and read-only volumes
696 func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFactory) {
697         s.setup(t)
698         v := s.newVolume(t, factory)
699         defer v.Teardown()
700
701         v.PutRaw(TestHash, TestBlock)
702         v.PutRaw(TestHash2, TestBlock2)
703         v.PutRaw(TestHash3, TestBlock3)
704
705         sem := make(chan int)
706         go func() {
707                 buf := make([]byte, BlockSize)
708                 n, err := v.Get(context.Background(), TestHash, buf)
709                 if err != nil {
710                         t.Errorf("err1: %v", err)
711                 }
712                 if bytes.Compare(buf[:n], TestBlock) != 0 {
713                         t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf[:n]))
714                 }
715                 sem <- 1
716         }()
717
718         go func() {
719                 buf := make([]byte, BlockSize)
720                 n, err := v.Get(context.Background(), TestHash2, buf)
721                 if err != nil {
722                         t.Errorf("err2: %v", err)
723                 }
724                 if bytes.Compare(buf[:n], TestBlock2) != 0 {
725                         t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf[:n]))
726                 }
727                 sem <- 1
728         }()
729
730         go func() {
731                 buf := make([]byte, BlockSize)
732                 n, err := v.Get(context.Background(), TestHash3, buf)
733                 if err != nil {
734                         t.Errorf("err3: %v", err)
735                 }
736                 if bytes.Compare(buf[:n], TestBlock3) != 0 {
737                         t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf[:n]))
738                 }
739                 sem <- 1
740         }()
741
742         // Wait for all goroutines to finish
743         for done := 0; done < 3; done++ {
744                 <-sem
745         }
746 }
747
748 // Launch concurrent Puts
749 // Test is intended for only writable volumes
750 func (s *genericVolumeSuite) testPutConcurrent(t TB, factory TestableVolumeFactory) {
751         s.setup(t)
752         v := s.newVolume(t, factory)
753         defer v.Teardown()
754
755         sem := make(chan int)
756         go func(sem chan int) {
757                 err := v.Put(context.Background(), TestHash, TestBlock)
758                 if err != nil {
759                         t.Errorf("err1: %v", err)
760                 }
761                 sem <- 1
762         }(sem)
763
764         go func(sem chan int) {
765                 err := v.Put(context.Background(), TestHash2, TestBlock2)
766                 if err != nil {
767                         t.Errorf("err2: %v", err)
768                 }
769                 sem <- 1
770         }(sem)
771
772         go func(sem chan int) {
773                 err := v.Put(context.Background(), TestHash3, TestBlock3)
774                 if err != nil {
775                         t.Errorf("err3: %v", err)
776                 }
777                 sem <- 1
778         }(sem)
779
780         // Wait for all goroutines to finish
781         for done := 0; done < 3; done++ {
782                 <-sem
783         }
784
785         // Double check that we actually wrote the blocks we expected to write.
786         buf := make([]byte, BlockSize)
787         n, err := v.Get(context.Background(), TestHash, buf)
788         if err != nil {
789                 t.Errorf("Get #1: %v", err)
790         }
791         if bytes.Compare(buf[:n], TestBlock) != 0 {
792                 t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
793         }
794
795         n, err = v.Get(context.Background(), TestHash2, buf)
796         if err != nil {
797                 t.Errorf("Get #2: %v", err)
798         }
799         if bytes.Compare(buf[:n], TestBlock2) != 0 {
800                 t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
801         }
802
803         n, err = v.Get(context.Background(), TestHash3, buf)
804         if err != nil {
805                 t.Errorf("Get #3: %v", err)
806         }
807         if bytes.Compare(buf[:n], TestBlock3) != 0 {
808                 t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf[:n]))
809         }
810 }
811
812 // Write and read back a full size block
813 func (s *genericVolumeSuite) testPutFullBlock(t TB, factory TestableVolumeFactory) {
814         s.setup(t)
815         v := s.newVolume(t, factory)
816         defer v.Teardown()
817
818         wdata := make([]byte, BlockSize)
819         wdata[0] = 'a'
820         wdata[BlockSize-1] = 'z'
821         hash := fmt.Sprintf("%x", md5.Sum(wdata))
822         err := v.Put(context.Background(), hash, wdata)
823         if err != nil {
824                 t.Fatal(err)
825         }
826         buf := make([]byte, BlockSize)
827         n, err := v.Get(context.Background(), hash, buf)
828         if err != nil {
829                 t.Error(err)
830         }
831         if bytes.Compare(buf[:n], wdata) != 0 {
832                 t.Error("buf %+q != wdata %+q", buf[:n], wdata)
833         }
834 }
835
836 // With BlobTrashLifetime != 0, perform:
837 // Trash an old block - which either raises ErrNotImplemented or succeeds
838 // Untrash -  which either raises ErrNotImplemented or succeeds
839 // Get - which must succeed
840 func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory TestableVolumeFactory) {
841         s.setup(t)
842         s.cluster.Collections.BlobTrashLifetime.Set("1h")
843         v := s.newVolume(t, factory)
844         defer v.Teardown()
845
846         // put block and backdate it
847         v.PutRaw(TestHash, TestBlock)
848         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
849
850         buf := make([]byte, BlockSize)
851         n, err := v.Get(context.Background(), TestHash, buf)
852         if err != nil {
853                 t.Fatal(err)
854         }
855         if bytes.Compare(buf[:n], TestBlock) != 0 {
856                 t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
857         }
858
859         // Trash
860         err = v.Trash(TestHash)
861         if readonly {
862                 if err != MethodDisabledError {
863                         t.Fatal(err)
864                 }
865         } else if err != nil {
866                 if err != ErrNotImplemented {
867                         t.Fatal(err)
868                 }
869         } else {
870                 _, err = v.Get(context.Background(), TestHash, buf)
871                 if err == nil || !os.IsNotExist(err) {
872                         t.Errorf("os.IsNotExist(%v) should have been true", err)
873                 }
874
875                 // Untrash
876                 err = v.Untrash(TestHash)
877                 if err != nil {
878                         t.Fatal(err)
879                 }
880         }
881
882         // Get the block - after trash and untrash sequence
883         n, err = v.Get(context.Background(), TestHash, buf)
884         if err != nil {
885                 t.Fatal(err)
886         }
887         if bytes.Compare(buf[:n], TestBlock) != 0 {
888                 t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
889         }
890 }
891
892 func (s *genericVolumeSuite) testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
893         s.setup(t)
894         v := s.newVolume(t, factory)
895         defer v.Teardown()
896
897         checkGet := func() error {
898                 buf := make([]byte, BlockSize)
899                 n, err := v.Get(context.Background(), TestHash, buf)
900                 if err != nil {
901                         return err
902                 }
903                 if bytes.Compare(buf[:n], TestBlock) != 0 {
904                         t.Fatalf("Got data %+q, expected %+q", buf[:n], TestBlock)
905                 }
906
907                 _, err = v.Mtime(TestHash)
908                 if err != nil {
909                         return err
910                 }
911
912                 err = v.Compare(context.Background(), TestHash, TestBlock)
913                 if err != nil {
914                         return err
915                 }
916
917                 indexBuf := new(bytes.Buffer)
918                 v.IndexTo("", indexBuf)
919                 if !strings.Contains(string(indexBuf.Bytes()), TestHash) {
920                         return os.ErrNotExist
921                 }
922
923                 return nil
924         }
925
926         // First set: EmptyTrash before reaching the trash deadline.
927
928         s.cluster.Collections.BlobTrashLifetime.Set("1h")
929
930         v.PutRaw(TestHash, TestBlock)
931         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
932
933         err := checkGet()
934         if err != nil {
935                 t.Fatal(err)
936         }
937
938         // Trash the block
939         err = v.Trash(TestHash)
940         if err == MethodDisabledError || err == ErrNotImplemented {
941                 // Skip the trash tests for read-only volumes, and
942                 // volume types that don't support
943                 // BlobTrashLifetime>0.
944                 return
945         }
946
947         err = checkGet()
948         if err == nil || !os.IsNotExist(err) {
949                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
950         }
951
952         err = v.Touch(TestHash)
953         if err == nil || !os.IsNotExist(err) {
954                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
955         }
956
957         v.EmptyTrash()
958
959         // Even after emptying the trash, we can untrash our block
960         // because the deadline hasn't been reached.
961         err = v.Untrash(TestHash)
962         if err != nil {
963                 t.Fatal(err)
964         }
965
966         err = checkGet()
967         if err != nil {
968                 t.Fatal(err)
969         }
970
971         err = v.Touch(TestHash)
972         if err != nil {
973                 t.Fatal(err)
974         }
975
976         // Because we Touch'ed, need to backdate again for next set of tests
977         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
978
979         // If the only block in the trash has already been untrashed,
980         // most volumes will fail a subsequent Untrash with a 404, but
981         // it's also acceptable for Untrash to succeed.
982         err = v.Untrash(TestHash)
983         if err != nil && !os.IsNotExist(err) {
984                 t.Fatalf("Expected success or os.IsNotExist(), but got: %v", err)
985         }
986
987         // The additional Untrash should not interfere with our
988         // already-untrashed copy.
989         err = checkGet()
990         if err != nil {
991                 t.Fatal(err)
992         }
993
994         // Untrash might have updated the timestamp, so backdate again
995         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
996
997         // Second set: EmptyTrash after the trash deadline has passed.
998
999         s.cluster.Collections.BlobTrashLifetime.Set("1ns")
1000
1001         err = v.Trash(TestHash)
1002         if err != nil {
1003                 t.Fatal(err)
1004         }
1005         err = checkGet()
1006         if err == nil || !os.IsNotExist(err) {
1007                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1008         }
1009
1010         // Even though 1ns has passed, we can untrash because we
1011         // haven't called EmptyTrash yet.
1012         err = v.Untrash(TestHash)
1013         if err != nil {
1014                 t.Fatal(err)
1015         }
1016         err = checkGet()
1017         if err != nil {
1018                 t.Fatal(err)
1019         }
1020
1021         // Trash it again, and this time call EmptyTrash so it really
1022         // goes away.
1023         // (In Azure volumes, un/trash changes Mtime, so first backdate again)
1024         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
1025         _ = v.Trash(TestHash)
1026         err = checkGet()
1027         if err == nil || !os.IsNotExist(err) {
1028                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1029         }
1030         v.EmptyTrash()
1031
1032         // Untrash won't find it
1033         err = v.Untrash(TestHash)
1034         if err == nil || !os.IsNotExist(err) {
1035                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1036         }
1037
1038         // Get block won't find it
1039         err = checkGet()
1040         if err == nil || !os.IsNotExist(err) {
1041                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1042         }
1043
1044         // Third set: If the same data block gets written again after
1045         // being trashed, and then the trash gets emptied, the newer
1046         // un-trashed copy doesn't get deleted along with it.
1047
1048         v.PutRaw(TestHash, TestBlock)
1049         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
1050
1051         s.cluster.Collections.BlobTrashLifetime.Set("1ns")
1052         err = v.Trash(TestHash)
1053         if err != nil {
1054                 t.Fatal(err)
1055         }
1056         err = checkGet()
1057         if err == nil || !os.IsNotExist(err) {
1058                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1059         }
1060
1061         v.PutRaw(TestHash, TestBlock)
1062         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
1063
1064         // EmptyTrash should not delete the untrashed copy.
1065         v.EmptyTrash()
1066         err = checkGet()
1067         if err != nil {
1068                 t.Fatal(err)
1069         }
1070
1071         // Fourth set: If the same data block gets trashed twice with
1072         // different deadlines A and C, and then the trash is emptied
1073         // at intermediate time B (A < B < C), it is still possible to
1074         // untrash the block whose deadline is "C".
1075
1076         v.PutRaw(TestHash, TestBlock)
1077         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
1078
1079         s.cluster.Collections.BlobTrashLifetime.Set("1ns")
1080         err = v.Trash(TestHash)
1081         if err != nil {
1082                 t.Fatal(err)
1083         }
1084
1085         v.PutRaw(TestHash, TestBlock)
1086         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
1087
1088         s.cluster.Collections.BlobTrashLifetime.Set("1h")
1089         err = v.Trash(TestHash)
1090         if err != nil {
1091                 t.Fatal(err)
1092         }
1093
1094         // EmptyTrash should not prevent us from recovering the
1095         // time.Hour ("C") trash
1096         v.EmptyTrash()
1097         err = v.Untrash(TestHash)
1098         if err != nil {
1099                 t.Fatal(err)
1100         }
1101         err = checkGet()
1102         if err != nil {
1103                 t.Fatal(err)
1104         }
1105 }