Warn if MaxKeepBlobBuffers > MaxConcurrentRequests.
[arvados.git] / services / keepstore / volume_generic_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "os"
13         "regexp"
14         "sort"
15         "strconv"
16         "strings"
17         "time"
18
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/arvadostest"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "github.com/prometheus/client_golang/prometheus"
23         dto "github.com/prometheus/client_model/go"
24         "github.com/sirupsen/logrus"
25 )
26
27 type TB interface {
28         Error(args ...interface{})
29         Errorf(format string, args ...interface{})
30         Fail()
31         FailNow()
32         Failed() bool
33         Fatal(args ...interface{})
34         Fatalf(format string, args ...interface{})
35         Log(args ...interface{})
36         Logf(format string, args ...interface{})
37 }
38
39 // A TestableVolumeFactory returns a new TestableVolume. The factory
40 // function, and the TestableVolume it returns, can use "t" to write
41 // logs, fail the current test, etc.
42 type TestableVolumeFactory func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume
43
44 // DoGenericVolumeTests runs a set of tests that every TestableVolume
45 // is expected to pass. It calls factory to create a new TestableVolume
46 // for each test case, to avoid leaking state between tests.
47 func DoGenericVolumeTests(t TB, readonly bool, factory TestableVolumeFactory) {
48         var s genericVolumeSuite
49         s.volume.ReadOnly = readonly
50
51         s.testGet(t, factory)
52         s.testGetNoSuchBlock(t, factory)
53
54         s.testCompareNonexistent(t, factory)
55         s.testCompareSameContent(t, factory, TestHash, TestBlock)
56         s.testCompareSameContent(t, factory, EmptyHash, EmptyBlock)
57         s.testCompareWithCollision(t, factory, TestHash, TestBlock, []byte("baddata"))
58         s.testCompareWithCollision(t, factory, TestHash, TestBlock, EmptyBlock)
59         s.testCompareWithCollision(t, factory, EmptyHash, EmptyBlock, TestBlock)
60         s.testCompareWithCorruptStoredData(t, factory, TestHash, TestBlock, []byte("baddata"))
61         s.testCompareWithCorruptStoredData(t, factory, TestHash, TestBlock, EmptyBlock)
62         s.testCompareWithCorruptStoredData(t, factory, EmptyHash, EmptyBlock, []byte("baddata"))
63
64         if !readonly {
65                 s.testPutBlockWithSameContent(t, factory, TestHash, TestBlock)
66                 s.testPutBlockWithSameContent(t, factory, EmptyHash, EmptyBlock)
67                 s.testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], arvadostest.MD5CollisionData[1])
68                 s.testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, EmptyBlock, arvadostest.MD5CollisionData[0])
69                 s.testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], EmptyBlock)
70                 s.testPutBlockWithDifferentContent(t, factory, EmptyHash, EmptyBlock, arvadostest.MD5CollisionData[0])
71                 s.testPutMultipleBlocks(t, factory)
72
73                 s.testPutAndTouch(t, factory)
74         }
75         s.testTouchNoSuchBlock(t, factory)
76
77         s.testMtimeNoSuchBlock(t, factory)
78
79         s.testIndexTo(t, factory)
80
81         if !readonly {
82                 s.testDeleteNewBlock(t, factory)
83                 s.testDeleteOldBlock(t, factory)
84         }
85         s.testDeleteNoSuchBlock(t, factory)
86
87         s.testStatus(t, factory)
88
89         s.testMetrics(t, readonly, factory)
90
91         s.testString(t, factory)
92
93         if readonly {
94                 s.testUpdateReadOnly(t, factory)
95         }
96
97         s.testGetConcurrent(t, factory)
98         if !readonly {
99                 s.testPutConcurrent(t, factory)
100
101                 s.testPutFullBlock(t, factory)
102         }
103
104         s.testTrashUntrash(t, readonly, factory)
105         s.testTrashEmptyTrashUntrash(t, factory)
106 }
107
108 type genericVolumeSuite struct {
109         cluster  *arvados.Cluster
110         volume   arvados.Volume
111         logger   logrus.FieldLogger
112         metrics  *volumeMetricsVecs
113         registry *prometheus.Registry
114 }
115
116 func (s *genericVolumeSuite) setup(t TB) {
117         s.cluster = testCluster(t)
118         s.logger = ctxlog.TestLogger(t)
119         s.registry = prometheus.NewRegistry()
120         s.metrics = newVolumeMetricsVecs(s.registry)
121 }
122
123 func (s *genericVolumeSuite) newVolume(t TB, factory TestableVolumeFactory) TestableVolume {
124         return factory(t, s.cluster, s.volume, s.logger, s.metrics)
125 }
126
127 // Put a test block, get it and verify content
128 // Test should pass for both writable and read-only volumes
129 func (s *genericVolumeSuite) testGet(t TB, factory TestableVolumeFactory) {
130         s.setup(t)
131         v := s.newVolume(t, factory)
132         defer v.Teardown()
133
134         v.PutRaw(TestHash, TestBlock)
135
136         buf := make([]byte, BlockSize)
137         n, err := v.Get(context.Background(), TestHash, buf)
138         if err != nil {
139                 t.Fatal(err)
140         }
141
142         if bytes.Compare(buf[:n], TestBlock) != 0 {
143                 t.Errorf("expected %s, got %s", string(TestBlock), string(buf))
144         }
145 }
146
147 // Invoke get on a block that does not exist in volume; should result in error
148 // Test should pass for both writable and read-only volumes
149 func (s *genericVolumeSuite) testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
150         s.setup(t)
151         v := s.newVolume(t, factory)
152         defer v.Teardown()
153
154         buf := make([]byte, BlockSize)
155         if _, err := v.Get(context.Background(), TestHash2, buf); err == nil {
156                 t.Errorf("Expected error while getting non-existing block %v", TestHash2)
157         }
158 }
159
160 // Compare() should return os.ErrNotExist if the block does not exist.
161 // Otherwise, writing new data causes CompareAndTouch() to generate
162 // error logs even though everything is working fine.
163 func (s *genericVolumeSuite) testCompareNonexistent(t TB, factory TestableVolumeFactory) {
164         s.setup(t)
165         v := s.newVolume(t, factory)
166         defer v.Teardown()
167
168         err := v.Compare(context.Background(), TestHash, TestBlock)
169         if err != os.ErrNotExist {
170                 t.Errorf("Got err %T %q, expected os.ErrNotExist", err, err)
171         }
172 }
173
174 // Put a test block and compare the locator with same content
175 // Test should pass for both writable and read-only volumes
176 func (s *genericVolumeSuite) testCompareSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
177         s.setup(t)
178         v := s.newVolume(t, factory)
179         defer v.Teardown()
180
181         v.PutRaw(testHash, testData)
182
183         // Compare the block locator with same content
184         err := v.Compare(context.Background(), testHash, testData)
185         if err != nil {
186                 t.Errorf("Got err %q, expected nil", err)
187         }
188 }
189
190 // Test behavior of Compare() when stored data matches expected
191 // checksum but differs from new data we need to store. Requires
192 // testHash = md5(testDataA).
193 //
194 // Test should pass for both writable and read-only volumes
195 func (s *genericVolumeSuite) testCompareWithCollision(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
196         s.setup(t)
197         v := s.newVolume(t, factory)
198         defer v.Teardown()
199
200         v.PutRaw(testHash, testDataA)
201
202         // Compare the block locator with different content; collision
203         err := v.Compare(context.Background(), TestHash, testDataB)
204         if err == nil {
205                 t.Errorf("Got err nil, expected error due to collision")
206         }
207 }
208
209 // Test behavior of Compare() when stored data has become
210 // corrupted. Requires testHash = md5(testDataA) != md5(testDataB).
211 //
212 // Test should pass for both writable and read-only volumes
213 func (s *genericVolumeSuite) testCompareWithCorruptStoredData(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
214         s.setup(t)
215         v := s.newVolume(t, factory)
216         defer v.Teardown()
217
218         v.PutRaw(TestHash, testDataB)
219
220         err := v.Compare(context.Background(), testHash, testDataA)
221         if err == nil || err == CollisionError {
222                 t.Errorf("Got err %+v, expected non-collision error", err)
223         }
224 }
225
226 // Put a block and put again with same content
227 // Test is intended for only writable volumes
228 func (s *genericVolumeSuite) testPutBlockWithSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
229         s.setup(t)
230         v := s.newVolume(t, factory)
231         defer v.Teardown()
232
233         err := v.Put(context.Background(), testHash, testData)
234         if err != nil {
235                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
236         }
237
238         err = v.Put(context.Background(), testHash, testData)
239         if err != nil {
240                 t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
241         }
242 }
243
244 // Put a block and put again with different content
245 // Test is intended for only writable volumes
246 func (s *genericVolumeSuite) testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
247         s.setup(t)
248         v := s.newVolume(t, factory)
249         defer v.Teardown()
250
251         v.PutRaw(testHash, testDataA)
252
253         putErr := v.Put(context.Background(), testHash, testDataB)
254         buf := make([]byte, BlockSize)
255         n, getErr := v.Get(context.Background(), testHash, buf)
256         if putErr == nil {
257                 // Put must not return a nil error unless it has
258                 // overwritten the existing data.
259                 if bytes.Compare(buf[:n], testDataB) != 0 {
260                         t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf[:n], testDataB)
261                 }
262         } else {
263                 // It is permissible for Put to fail, but it must
264                 // leave us with either the original data, the new
265                 // data, or nothing at all.
266                 if getErr == nil && bytes.Compare(buf[:n], testDataA) != 0 && bytes.Compare(buf[:n], testDataB) != 0 {
267                         t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf[:n], testDataA, testDataB)
268                 }
269         }
270 }
271
272 // Put and get multiple blocks
273 // Test is intended for only writable volumes
274 func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
275         s.setup(t)
276         v := s.newVolume(t, factory)
277         defer v.Teardown()
278
279         err := v.Put(context.Background(), TestHash, TestBlock)
280         if err != nil {
281                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
282         }
283
284         err = v.Put(context.Background(), TestHash2, TestBlock2)
285         if err != nil {
286                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
287         }
288
289         err = v.Put(context.Background(), TestHash3, TestBlock3)
290         if err != nil {
291                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
292         }
293
294         data := make([]byte, BlockSize)
295         n, err := v.Get(context.Background(), TestHash, data)
296         if err != nil {
297                 t.Error(err)
298         } else {
299                 if bytes.Compare(data[:n], TestBlock) != 0 {
300                         t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock)
301                 }
302         }
303
304         n, err = v.Get(context.Background(), TestHash2, data)
305         if err != nil {
306                 t.Error(err)
307         } else {
308                 if bytes.Compare(data[:n], TestBlock2) != 0 {
309                         t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock2)
310                 }
311         }
312
313         n, err = v.Get(context.Background(), TestHash3, data)
314         if err != nil {
315                 t.Error(err)
316         } else {
317                 if bytes.Compare(data[:n], TestBlock3) != 0 {
318                         t.Errorf("Block present, but to %+q, expected %+q", data[:n], TestBlock3)
319                 }
320         }
321 }
322
323 // testPutAndTouch
324 //   Test that when applying PUT to a block that already exists,
325 //   the block's modification time is updated.
326 // Test is intended for only writable volumes
327 func (s *genericVolumeSuite) testPutAndTouch(t TB, factory TestableVolumeFactory) {
328         s.setup(t)
329         v := s.newVolume(t, factory)
330         defer v.Teardown()
331
332         if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
333                 t.Error(err)
334         }
335
336         // We'll verify { t0 < threshold < t1 }, where t0 is the
337         // existing block's timestamp on disk before Put() and t1 is
338         // its timestamp after Put().
339         threshold := time.Now().Add(-time.Second)
340
341         // Set the stored block's mtime far enough in the past that we
342         // can see the difference between "timestamp didn't change"
343         // and "timestamp granularity is too low".
344         v.TouchWithDate(TestHash, time.Now().Add(-20*time.Second))
345
346         // Make sure v.Mtime() agrees the above Utime really worked.
347         if t0, err := v.Mtime(TestHash); err != nil || t0.IsZero() || !t0.Before(threshold) {
348                 t.Errorf("Setting mtime failed: %v, %v", t0, err)
349         }
350
351         // Write the same block again.
352         if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
353                 t.Error(err)
354         }
355
356         // Verify threshold < t1
357         if t1, err := v.Mtime(TestHash); err != nil {
358                 t.Error(err)
359         } else if t1.Before(threshold) {
360                 t.Errorf("t1 %v should be >= threshold %v after v.Put ", t1, threshold)
361         }
362 }
363
364 // Touching a non-existing block should result in error.
365 // Test should pass for both writable and read-only volumes
366 func (s *genericVolumeSuite) testTouchNoSuchBlock(t TB, factory TestableVolumeFactory) {
367         s.setup(t)
368         v := s.newVolume(t, factory)
369         defer v.Teardown()
370
371         if err := v.Touch(TestHash); err == nil {
372                 t.Error("Expected error when attempted to touch a non-existing block")
373         }
374 }
375
376 // Invoking Mtime on a non-existing block should result in error.
377 // Test should pass for both writable and read-only volumes
378 func (s *genericVolumeSuite) testMtimeNoSuchBlock(t TB, factory TestableVolumeFactory) {
379         s.setup(t)
380         v := s.newVolume(t, factory)
381         defer v.Teardown()
382
383         if _, err := v.Mtime("12345678901234567890123456789012"); err == nil {
384                 t.Error("Expected error when updating Mtime on a non-existing block")
385         }
386 }
387
388 // Put a few blocks and invoke IndexTo with:
389 // * no prefix
390 // * with a prefix
391 // * with no such prefix
392 // Test should pass for both writable and read-only volumes
393 func (s *genericVolumeSuite) testIndexTo(t TB, factory TestableVolumeFactory) {
394         s.setup(t)
395         v := s.newVolume(t, factory)
396         defer v.Teardown()
397
398         // minMtime and maxMtime are the minimum and maximum
399         // acceptable values the index can report for our test
400         // blocks. 1-second precision is acceptable.
401         minMtime := time.Now().UTC().UnixNano()
402         minMtime -= minMtime % 1e9
403
404         v.PutRaw(TestHash, TestBlock)
405         v.PutRaw(TestHash2, TestBlock2)
406         v.PutRaw(TestHash3, TestBlock3)
407
408         maxMtime := time.Now().UTC().UnixNano()
409         if maxMtime%1e9 > 0 {
410                 maxMtime -= maxMtime % 1e9
411                 maxMtime += 1e9
412         }
413
414         // Blocks whose names aren't Keep hashes should be omitted from
415         // index
416         v.PutRaw("fffffffffnotreallyahashfffffffff", nil)
417         v.PutRaw("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", nil)
418         v.PutRaw("f0000000000000000000000000000000f", nil)
419         v.PutRaw("f00", nil)
420
421         buf := new(bytes.Buffer)
422         v.IndexTo("", buf)
423         indexRows := strings.Split(string(buf.Bytes()), "\n")
424         sort.Strings(indexRows)
425         sortedIndex := strings.Join(indexRows, "\n")
426         m := regexp.MustCompile(
427                 `^\n` + TestHash + `\+\d+ (\d+)\n` +
428                         TestHash3 + `\+\d+ \d+\n` +
429                         TestHash2 + `\+\d+ \d+$`,
430         ).FindStringSubmatch(sortedIndex)
431         if m == nil {
432                 t.Errorf("Got index %q for empty prefix", sortedIndex)
433         } else {
434                 mtime, err := strconv.ParseInt(m[1], 10, 64)
435                 if err != nil {
436                         t.Error(err)
437                 } else if mtime < minMtime || mtime > maxMtime {
438                         t.Errorf("got %d for TestHash timestamp, expected %d <= t <= %d",
439                                 mtime, minMtime, maxMtime)
440                 }
441         }
442
443         for _, prefix := range []string{"f", "f15", "f15ac"} {
444                 buf = new(bytes.Buffer)
445                 v.IndexTo(prefix, buf)
446
447                 m, err := regexp.MatchString(`^`+TestHash2+`\+\d+ \d+\n$`, string(buf.Bytes()))
448                 if err != nil {
449                         t.Error(err)
450                 } else if !m {
451                         t.Errorf("Got index %q for prefix %s", string(buf.Bytes()), prefix)
452                 }
453         }
454
455         for _, prefix := range []string{"zero", "zip", "zilch"} {
456                 buf = new(bytes.Buffer)
457                 err := v.IndexTo(prefix, buf)
458                 if err != nil {
459                         t.Errorf("Got error on IndexTo with no such prefix %v", err.Error())
460                 } else if buf.Len() != 0 {
461                         t.Errorf("Expected empty list for IndexTo with no such prefix %s", prefix)
462                 }
463         }
464 }
465
466 // Calling Delete() for a block immediately after writing it (not old enough)
467 // should neither delete the data nor return an error.
468 // Test is intended for only writable volumes
469 func (s *genericVolumeSuite) testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
470         s.setup(t)
471         s.cluster.Collections.BlobSigningTTL.Set("5m")
472         v := s.newVolume(t, factory)
473         defer v.Teardown()
474
475         v.Put(context.Background(), TestHash, TestBlock)
476
477         if err := v.Trash(TestHash); err != nil {
478                 t.Error(err)
479         }
480         data := make([]byte, BlockSize)
481         n, err := v.Get(context.Background(), TestHash, data)
482         if err != nil {
483                 t.Error(err)
484         } else if bytes.Compare(data[:n], TestBlock) != 0 {
485                 t.Errorf("Got data %+q, expected %+q", data[:n], TestBlock)
486         }
487 }
488
489 // Calling Delete() for a block with a timestamp older than
490 // BlobSigningTTL seconds in the past should delete the data.  Test is
491 // intended for only writable volumes
492 func (s *genericVolumeSuite) testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
493         s.setup(t)
494         s.cluster.Collections.BlobSigningTTL.Set("5m")
495         v := s.newVolume(t, factory)
496         defer v.Teardown()
497
498         v.Put(context.Background(), TestHash, TestBlock)
499         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
500
501         if err := v.Trash(TestHash); err != nil {
502                 t.Error(err)
503         }
504         data := make([]byte, BlockSize)
505         if _, err := v.Get(context.Background(), TestHash, data); err == nil || !os.IsNotExist(err) {
506                 t.Errorf("os.IsNotExist(%v) should have been true", err)
507         }
508
509         _, err := v.Mtime(TestHash)
510         if err == nil || !os.IsNotExist(err) {
511                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
512         }
513
514         err = v.Compare(context.Background(), TestHash, TestBlock)
515         if err == nil || !os.IsNotExist(err) {
516                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
517         }
518
519         indexBuf := new(bytes.Buffer)
520         v.IndexTo("", indexBuf)
521         if strings.Contains(string(indexBuf.Bytes()), TestHash) {
522                 t.Fatalf("Found trashed block in IndexTo")
523         }
524
525         err = v.Touch(TestHash)
526         if err == nil || !os.IsNotExist(err) {
527                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
528         }
529 }
530
531 // Calling Delete() for a block that does not exist should result in error.
532 // Test should pass for both writable and read-only volumes
533 func (s *genericVolumeSuite) testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
534         s.setup(t)
535         v := s.newVolume(t, factory)
536         defer v.Teardown()
537
538         if err := v.Trash(TestHash2); err == nil {
539                 t.Errorf("Expected error when attempting to delete a non-existing block")
540         }
541 }
542
543 // Invoke Status and verify that VolumeStatus is returned
544 // Test should pass for both writable and read-only volumes
545 func (s *genericVolumeSuite) testStatus(t TB, factory TestableVolumeFactory) {
546         s.setup(t)
547         v := s.newVolume(t, factory)
548         defer v.Teardown()
549
550         // Get node status and make a basic sanity check.
551         status := v.Status()
552         if status.DeviceNum == 0 {
553                 t.Errorf("uninitialized device_num in %v", status)
554         }
555
556         if status.BytesFree == 0 {
557                 t.Errorf("uninitialized bytes_free in %v", status)
558         }
559
560         if status.BytesUsed == 0 {
561                 t.Errorf("uninitialized bytes_used in %v", status)
562         }
563 }
564
565 func getValueFrom(cv *prometheus.CounterVec, lbls prometheus.Labels) float64 {
566         c, _ := cv.GetMetricWith(lbls)
567         pb := &dto.Metric{}
568         c.Write(pb)
569         return pb.GetCounter().GetValue()
570 }
571
572 func (s *genericVolumeSuite) testMetrics(t TB, readonly bool, factory TestableVolumeFactory) {
573         var err error
574
575         s.setup(t)
576         v := s.newVolume(t, factory)
577         defer v.Teardown()
578
579         opsC, _, ioC := s.metrics.getCounterVecsFor(prometheus.Labels{"device_id": v.GetDeviceID()})
580
581         if ioC == nil {
582                 t.Error("ioBytes CounterVec is nil")
583                 return
584         }
585
586         if getValueFrom(ioC, prometheus.Labels{"direction": "out"})+
587                 getValueFrom(ioC, prometheus.Labels{"direction": "in"}) > 0 {
588                 t.Error("ioBytes counter should be zero")
589         }
590
591         if opsC == nil {
592                 t.Error("opsCounter CounterVec is nil")
593                 return
594         }
595
596         var c, writeOpCounter, readOpCounter float64
597
598         readOpType, writeOpType := v.ReadWriteOperationLabelValues()
599         writeOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
600         readOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
601
602         // Test Put if volume is writable
603         if !readonly {
604                 err = v.Put(context.Background(), TestHash, TestBlock)
605                 if err != nil {
606                         t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
607                 }
608                 // Check that the write operations counter increased
609                 c = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
610                 if c <= writeOpCounter {
611                         t.Error("Operation(s) not counted on Put")
612                 }
613                 // Check that bytes counter is > 0
614                 if getValueFrom(ioC, prometheus.Labels{"direction": "out"}) == 0 {
615                         t.Error("ioBytes{direction=out} counter shouldn't be zero")
616                 }
617         } else {
618                 v.PutRaw(TestHash, TestBlock)
619         }
620
621         buf := make([]byte, BlockSize)
622         _, err = v.Get(context.Background(), TestHash, buf)
623         if err != nil {
624                 t.Fatal(err)
625         }
626
627         // Check that the operations counter increased
628         c = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
629         if c <= readOpCounter {
630                 t.Error("Operation(s) not counted on Get")
631         }
632         // Check that the bytes "in" counter is > 0
633         if getValueFrom(ioC, prometheus.Labels{"direction": "in"}) == 0 {
634                 t.Error("ioBytes{direction=in} counter shouldn't be zero")
635         }
636 }
637
638 // Invoke String for the volume; expect non-empty result
639 // Test should pass for both writable and read-only volumes
640 func (s *genericVolumeSuite) testString(t TB, factory TestableVolumeFactory) {
641         s.setup(t)
642         v := s.newVolume(t, factory)
643         defer v.Teardown()
644
645         if id := v.String(); len(id) == 0 {
646                 t.Error("Got empty string for v.String()")
647         }
648 }
649
650 // Putting, updating, touching, and deleting blocks from a read-only volume result in error.
651 // Test is intended for only read-only volumes
652 func (s *genericVolumeSuite) testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
653         s.setup(t)
654         v := s.newVolume(t, factory)
655         defer v.Teardown()
656
657         v.PutRaw(TestHash, TestBlock)
658         buf := make([]byte, BlockSize)
659
660         // Get from read-only volume should succeed
661         _, err := v.Get(context.Background(), TestHash, buf)
662         if err != nil {
663                 t.Errorf("got err %v, expected nil", err)
664         }
665
666         // Put a new block to read-only volume should result in error
667         err = v.Put(context.Background(), TestHash2, TestBlock2)
668         if err == nil {
669                 t.Errorf("Expected error when putting block in a read-only volume")
670         }
671         _, err = v.Get(context.Background(), TestHash2, buf)
672         if err == nil {
673                 t.Errorf("Expected error when getting block whose put in read-only volume failed")
674         }
675
676         // Touch a block in read-only volume should result in error
677         err = v.Touch(TestHash)
678         if err == nil {
679                 t.Errorf("Expected error when touching block in a read-only volume")
680         }
681
682         // Delete a block from a read-only volume should result in error
683         err = v.Trash(TestHash)
684         if err == nil {
685                 t.Errorf("Expected error when deleting block from a read-only volume")
686         }
687
688         // Overwriting an existing block in read-only volume should result in error
689         err = v.Put(context.Background(), TestHash, TestBlock)
690         if err == nil {
691                 t.Errorf("Expected error when putting block in a read-only volume")
692         }
693 }
694
695 // Launch concurrent Gets
696 // Test should pass for both writable and read-only volumes
697 func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFactory) {
698         s.setup(t)
699         v := s.newVolume(t, factory)
700         defer v.Teardown()
701
702         v.PutRaw(TestHash, TestBlock)
703         v.PutRaw(TestHash2, TestBlock2)
704         v.PutRaw(TestHash3, TestBlock3)
705
706         sem := make(chan int)
707         go func() {
708                 buf := make([]byte, BlockSize)
709                 n, err := v.Get(context.Background(), TestHash, buf)
710                 if err != nil {
711                         t.Errorf("err1: %v", err)
712                 }
713                 if bytes.Compare(buf[:n], TestBlock) != 0 {
714                         t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf[:n]))
715                 }
716                 sem <- 1
717         }()
718
719         go func() {
720                 buf := make([]byte, BlockSize)
721                 n, err := v.Get(context.Background(), TestHash2, buf)
722                 if err != nil {
723                         t.Errorf("err2: %v", err)
724                 }
725                 if bytes.Compare(buf[:n], TestBlock2) != 0 {
726                         t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf[:n]))
727                 }
728                 sem <- 1
729         }()
730
731         go func() {
732                 buf := make([]byte, BlockSize)
733                 n, err := v.Get(context.Background(), TestHash3, buf)
734                 if err != nil {
735                         t.Errorf("err3: %v", err)
736                 }
737                 if bytes.Compare(buf[:n], TestBlock3) != 0 {
738                         t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf[:n]))
739                 }
740                 sem <- 1
741         }()
742
743         // Wait for all goroutines to finish
744         for done := 0; done < 3; done++ {
745                 <-sem
746         }
747 }
748
749 // Launch concurrent Puts
750 // Test is intended for only writable volumes
751 func (s *genericVolumeSuite) testPutConcurrent(t TB, factory TestableVolumeFactory) {
752         s.setup(t)
753         v := s.newVolume(t, factory)
754         defer v.Teardown()
755
756         sem := make(chan int)
757         go func(sem chan int) {
758                 err := v.Put(context.Background(), TestHash, TestBlock)
759                 if err != nil {
760                         t.Errorf("err1: %v", err)
761                 }
762                 sem <- 1
763         }(sem)
764
765         go func(sem chan int) {
766                 err := v.Put(context.Background(), TestHash2, TestBlock2)
767                 if err != nil {
768                         t.Errorf("err2: %v", err)
769                 }
770                 sem <- 1
771         }(sem)
772
773         go func(sem chan int) {
774                 err := v.Put(context.Background(), TestHash3, TestBlock3)
775                 if err != nil {
776                         t.Errorf("err3: %v", err)
777                 }
778                 sem <- 1
779         }(sem)
780
781         // Wait for all goroutines to finish
782         for done := 0; done < 3; done++ {
783                 <-sem
784         }
785
786         // Double check that we actually wrote the blocks we expected to write.
787         buf := make([]byte, BlockSize)
788         n, err := v.Get(context.Background(), TestHash, buf)
789         if err != nil {
790                 t.Errorf("Get #1: %v", err)
791         }
792         if bytes.Compare(buf[:n], TestBlock) != 0 {
793                 t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
794         }
795
796         n, err = v.Get(context.Background(), TestHash2, buf)
797         if err != nil {
798                 t.Errorf("Get #2: %v", err)
799         }
800         if bytes.Compare(buf[:n], TestBlock2) != 0 {
801                 t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
802         }
803
804         n, err = v.Get(context.Background(), TestHash3, buf)
805         if err != nil {
806                 t.Errorf("Get #3: %v", err)
807         }
808         if bytes.Compare(buf[:n], TestBlock3) != 0 {
809                 t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf[:n]))
810         }
811 }
812
813 // Write and read back a full size block
814 func (s *genericVolumeSuite) testPutFullBlock(t TB, factory TestableVolumeFactory) {
815         s.setup(t)
816         v := s.newVolume(t, factory)
817         defer v.Teardown()
818
819         wdata := make([]byte, BlockSize)
820         wdata[0] = 'a'
821         wdata[BlockSize-1] = 'z'
822         hash := fmt.Sprintf("%x", md5.Sum(wdata))
823         err := v.Put(context.Background(), hash, wdata)
824         if err != nil {
825                 t.Fatal(err)
826         }
827         buf := make([]byte, BlockSize)
828         n, err := v.Get(context.Background(), hash, buf)
829         if err != nil {
830                 t.Error(err)
831         }
832         if bytes.Compare(buf[:n], wdata) != 0 {
833                 t.Error("buf %+q != wdata %+q", buf[:n], wdata)
834         }
835 }
836
837 // With BlobTrashLifetime != 0, perform:
838 // Trash an old block - which either raises ErrNotImplemented or succeeds
839 // Untrash -  which either raises ErrNotImplemented or succeeds
840 // Get - which must succeed
841 func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory TestableVolumeFactory) {
842         s.setup(t)
843         s.cluster.Collections.BlobTrashLifetime.Set("1h")
844         v := s.newVolume(t, factory)
845         defer v.Teardown()
846
847         // put block and backdate it
848         v.PutRaw(TestHash, TestBlock)
849         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
850
851         buf := make([]byte, BlockSize)
852         n, err := v.Get(context.Background(), TestHash, buf)
853         if err != nil {
854                 t.Fatal(err)
855         }
856         if bytes.Compare(buf[:n], TestBlock) != 0 {
857                 t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
858         }
859
860         // Trash
861         err = v.Trash(TestHash)
862         if readonly {
863                 if err != MethodDisabledError {
864                         t.Fatal(err)
865                 }
866         } else if err != nil {
867                 if err != ErrNotImplemented {
868                         t.Fatal(err)
869                 }
870         } else {
871                 _, err = v.Get(context.Background(), TestHash, buf)
872                 if err == nil || !os.IsNotExist(err) {
873                         t.Errorf("os.IsNotExist(%v) should have been true", err)
874                 }
875
876                 // Untrash
877                 err = v.Untrash(TestHash)
878                 if err != nil {
879                         t.Fatal(err)
880                 }
881         }
882
883         // Get the block - after trash and untrash sequence
884         n, err = v.Get(context.Background(), TestHash, buf)
885         if err != nil {
886                 t.Fatal(err)
887         }
888         if bytes.Compare(buf[:n], TestBlock) != 0 {
889                 t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
890         }
891 }
892
893 func (s *genericVolumeSuite) testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
894         s.setup(t)
895         v := s.newVolume(t, factory)
896         defer v.Teardown()
897
898         checkGet := func() error {
899                 buf := make([]byte, BlockSize)
900                 n, err := v.Get(context.Background(), TestHash, buf)
901                 if err != nil {
902                         return err
903                 }
904                 if bytes.Compare(buf[:n], TestBlock) != 0 {
905                         t.Fatalf("Got data %+q, expected %+q", buf[:n], TestBlock)
906                 }
907
908                 _, err = v.Mtime(TestHash)
909                 if err != nil {
910                         return err
911                 }
912
913                 err = v.Compare(context.Background(), TestHash, TestBlock)
914                 if err != nil {
915                         return err
916                 }
917
918                 indexBuf := new(bytes.Buffer)
919                 v.IndexTo("", indexBuf)
920                 if !strings.Contains(string(indexBuf.Bytes()), TestHash) {
921                         return os.ErrNotExist
922                 }
923
924                 return nil
925         }
926
927         // First set: EmptyTrash before reaching the trash deadline.
928
929         s.cluster.Collections.BlobTrashLifetime.Set("1h")
930
931         v.PutRaw(TestHash, TestBlock)
932         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
933
934         err := checkGet()
935         if err != nil {
936                 t.Fatal(err)
937         }
938
939         // Trash the block
940         err = v.Trash(TestHash)
941         if err == MethodDisabledError || err == ErrNotImplemented {
942                 // Skip the trash tests for read-only volumes, and
943                 // volume types that don't support
944                 // BlobTrashLifetime>0.
945                 return
946         }
947
948         err = checkGet()
949         if err == nil || !os.IsNotExist(err) {
950                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
951         }
952
953         err = v.Touch(TestHash)
954         if err == nil || !os.IsNotExist(err) {
955                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
956         }
957
958         v.EmptyTrash()
959
960         // Even after emptying the trash, we can untrash our block
961         // because the deadline hasn't been reached.
962         err = v.Untrash(TestHash)
963         if err != nil {
964                 t.Fatal(err)
965         }
966
967         err = checkGet()
968         if err != nil {
969                 t.Fatal(err)
970         }
971
972         err = v.Touch(TestHash)
973         if err != nil {
974                 t.Fatal(err)
975         }
976
977         // Because we Touch'ed, need to backdate again for next set of tests
978         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
979
980         // If the only block in the trash has already been untrashed,
981         // most volumes will fail a subsequent Untrash with a 404, but
982         // it's also acceptable for Untrash to succeed.
983         err = v.Untrash(TestHash)
984         if err != nil && !os.IsNotExist(err) {
985                 t.Fatalf("Expected success or os.IsNotExist(), but got: %v", err)
986         }
987
988         // The additional Untrash should not interfere with our
989         // already-untrashed copy.
990         err = checkGet()
991         if err != nil {
992                 t.Fatal(err)
993         }
994
995         // Untrash might have updated the timestamp, so backdate again
996         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
997
998         // Second set: EmptyTrash after the trash deadline has passed.
999
1000         s.cluster.Collections.BlobTrashLifetime.Set("1ns")
1001
1002         err = v.Trash(TestHash)
1003         if err != nil {
1004                 t.Fatal(err)
1005         }
1006         err = checkGet()
1007         if err == nil || !os.IsNotExist(err) {
1008                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1009         }
1010
1011         // Even though 1ns has passed, we can untrash because we
1012         // haven't called EmptyTrash yet.
1013         err = v.Untrash(TestHash)
1014         if err != nil {
1015                 t.Fatal(err)
1016         }
1017         err = checkGet()
1018         if err != nil {
1019                 t.Fatal(err)
1020         }
1021
1022         // Trash it again, and this time call EmptyTrash so it really
1023         // goes away.
1024         // (In Azure volumes, un/trash changes Mtime, so first backdate again)
1025         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
1026         _ = v.Trash(TestHash)
1027         err = checkGet()
1028         if err == nil || !os.IsNotExist(err) {
1029                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1030         }
1031         v.EmptyTrash()
1032
1033         // Untrash won't find it
1034         err = v.Untrash(TestHash)
1035         if err == nil || !os.IsNotExist(err) {
1036                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1037         }
1038
1039         // Get block won't find it
1040         err = checkGet()
1041         if err == nil || !os.IsNotExist(err) {
1042                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1043         }
1044
1045         // Third set: If the same data block gets written again after
1046         // being trashed, and then the trash gets emptied, the newer
1047         // un-trashed copy doesn't get deleted along with it.
1048
1049         v.PutRaw(TestHash, TestBlock)
1050         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
1051
1052         s.cluster.Collections.BlobTrashLifetime.Set("1ns")
1053         err = v.Trash(TestHash)
1054         if err != nil {
1055                 t.Fatal(err)
1056         }
1057         err = checkGet()
1058         if err == nil || !os.IsNotExist(err) {
1059                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1060         }
1061
1062         v.PutRaw(TestHash, TestBlock)
1063         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
1064
1065         // EmptyTrash should not delete the untrashed copy.
1066         v.EmptyTrash()
1067         err = checkGet()
1068         if err != nil {
1069                 t.Fatal(err)
1070         }
1071
1072         // Fourth set: If the same data block gets trashed twice with
1073         // different deadlines A and C, and then the trash is emptied
1074         // at intermediate time B (A < B < C), it is still possible to
1075         // untrash the block whose deadline is "C".
1076
1077         v.PutRaw(TestHash, TestBlock)
1078         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
1079
1080         s.cluster.Collections.BlobTrashLifetime.Set("1ns")
1081         err = v.Trash(TestHash)
1082         if err != nil {
1083                 t.Fatal(err)
1084         }
1085
1086         v.PutRaw(TestHash, TestBlock)
1087         v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
1088
1089         s.cluster.Collections.BlobTrashLifetime.Set("1h")
1090         err = v.Trash(TestHash)
1091         if err != nil {
1092                 t.Fatal(err)
1093         }
1094
1095         // EmptyTrash should not prevent us from recovering the
1096         // time.Hour ("C") trash
1097         v.EmptyTrash()
1098         err = v.Untrash(TestHash)
1099         if err != nil {
1100                 t.Fatal(err)
1101         }
1102         err = checkGet()
1103         if err != nil {
1104                 t.Fatal(err)
1105         }
1106 }