13937: Adds tests for operation & I/O counters.
[arvados.git] / services / keepstore / volume_generic_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "os"
13         "regexp"
14         "sort"
15         "strconv"
16         "strings"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
21         "github.com/prometheus/client_golang/prometheus"
22         dto "github.com/prometheus/client_model/go"
23 )
24
25 type TB interface {
26         Error(args ...interface{})
27         Errorf(format string, args ...interface{})
28         Fail()
29         FailNow()
30         Failed() bool
31         Fatal(args ...interface{})
32         Fatalf(format string, args ...interface{})
33         Log(args ...interface{})
34         Logf(format string, args ...interface{})
35 }
36
37 // A TestableVolumeFactory returns a new TestableVolume. The factory
38 // function, and the TestableVolume it returns, can use "t" to write
39 // logs, fail the current test, etc.
40 type TestableVolumeFactory func(t TB) TestableVolume
41
42 // DoGenericVolumeTests runs a set of tests that every TestableVolume
43 // is expected to pass. It calls factory to create a new TestableVolume
44 // for each test case, to avoid leaking state between tests.
45 func DoGenericVolumeTests(t TB, factory TestableVolumeFactory) {
46         testGet(t, factory)
47         testGetNoSuchBlock(t, factory)
48
49         testCompareNonexistent(t, factory)
50         testCompareSameContent(t, factory, TestHash, TestBlock)
51         testCompareSameContent(t, factory, EmptyHash, EmptyBlock)
52         testCompareWithCollision(t, factory, TestHash, TestBlock, []byte("baddata"))
53         testCompareWithCollision(t, factory, TestHash, TestBlock, EmptyBlock)
54         testCompareWithCollision(t, factory, EmptyHash, EmptyBlock, TestBlock)
55         testCompareWithCorruptStoredData(t, factory, TestHash, TestBlock, []byte("baddata"))
56         testCompareWithCorruptStoredData(t, factory, TestHash, TestBlock, EmptyBlock)
57         testCompareWithCorruptStoredData(t, factory, EmptyHash, EmptyBlock, []byte("baddata"))
58
59         testPutBlockWithSameContent(t, factory, TestHash, TestBlock)
60         testPutBlockWithSameContent(t, factory, EmptyHash, EmptyBlock)
61         testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], arvadostest.MD5CollisionData[1])
62         testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, EmptyBlock, arvadostest.MD5CollisionData[0])
63         testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], EmptyBlock)
64         testPutBlockWithDifferentContent(t, factory, EmptyHash, EmptyBlock, arvadostest.MD5CollisionData[0])
65         testPutMultipleBlocks(t, factory)
66
67         testPutAndTouch(t, factory)
68         testTouchNoSuchBlock(t, factory)
69
70         testMtimeNoSuchBlock(t, factory)
71
72         testIndexTo(t, factory)
73
74         testDeleteNewBlock(t, factory)
75         testDeleteOldBlock(t, factory)
76         testDeleteNoSuchBlock(t, factory)
77
78         testStatus(t, factory)
79
80         testMetrics(t, factory)
81
82         testString(t, factory)
83
84         testUpdateReadOnly(t, factory)
85
86         testGetConcurrent(t, factory)
87         testPutConcurrent(t, factory)
88
89         testPutFullBlock(t, factory)
90
91         testTrashUntrash(t, factory)
92         testTrashEmptyTrashUntrash(t, factory)
93 }
94
95 // Put a test block, get it and verify content
96 // Test should pass for both writable and read-only volumes
97 func testGet(t TB, factory TestableVolumeFactory) {
98         v := factory(t)
99         defer v.Teardown()
100
101         v.PutRaw(TestHash, TestBlock)
102
103         buf := make([]byte, BlockSize)
104         n, err := v.Get(context.Background(), TestHash, buf)
105         if err != nil {
106                 t.Fatal(err)
107         }
108
109         if bytes.Compare(buf[:n], TestBlock) != 0 {
110                 t.Errorf("expected %s, got %s", string(TestBlock), string(buf))
111         }
112 }
113
114 // Invoke get on a block that does not exist in volume; should result in error
115 // Test should pass for both writable and read-only volumes
116 func testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
117         v := factory(t)
118         defer v.Teardown()
119
120         buf := make([]byte, BlockSize)
121         if _, err := v.Get(context.Background(), TestHash2, buf); err == nil {
122                 t.Errorf("Expected error while getting non-existing block %v", TestHash2)
123         }
124 }
125
126 // Compare() should return os.ErrNotExist if the block does not exist.
127 // Otherwise, writing new data causes CompareAndTouch() to generate
128 // error logs even though everything is working fine.
129 func testCompareNonexistent(t TB, factory TestableVolumeFactory) {
130         v := factory(t)
131         defer v.Teardown()
132
133         err := v.Compare(context.Background(), TestHash, TestBlock)
134         if err != os.ErrNotExist {
135                 t.Errorf("Got err %T %q, expected os.ErrNotExist", err, err)
136         }
137 }
138
139 // Put a test block and compare the locator with same content
140 // Test should pass for both writable and read-only volumes
141 func testCompareSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
142         v := factory(t)
143         defer v.Teardown()
144
145         v.PutRaw(testHash, testData)
146
147         // Compare the block locator with same content
148         err := v.Compare(context.Background(), testHash, testData)
149         if err != nil {
150                 t.Errorf("Got err %q, expected nil", err)
151         }
152 }
153
154 // Test behavior of Compare() when stored data matches expected
155 // checksum but differs from new data we need to store. Requires
156 // testHash = md5(testDataA).
157 //
158 // Test should pass for both writable and read-only volumes
159 func testCompareWithCollision(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
160         v := factory(t)
161         defer v.Teardown()
162
163         v.PutRaw(testHash, testDataA)
164
165         // Compare the block locator with different content; collision
166         err := v.Compare(context.Background(), TestHash, testDataB)
167         if err == nil {
168                 t.Errorf("Got err nil, expected error due to collision")
169         }
170 }
171
172 // Test behavior of Compare() when stored data has become
173 // corrupted. Requires testHash = md5(testDataA) != md5(testDataB).
174 //
175 // Test should pass for both writable and read-only volumes
176 func testCompareWithCorruptStoredData(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
177         v := factory(t)
178         defer v.Teardown()
179
180         v.PutRaw(TestHash, testDataB)
181
182         err := v.Compare(context.Background(), testHash, testDataA)
183         if err == nil || err == CollisionError {
184                 t.Errorf("Got err %+v, expected non-collision error", err)
185         }
186 }
187
188 // Put a block and put again with same content
189 // Test is intended for only writable volumes
190 func testPutBlockWithSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
191         v := factory(t)
192         defer v.Teardown()
193
194         if v.Writable() == false {
195                 return
196         }
197
198         err := v.Put(context.Background(), testHash, testData)
199         if err != nil {
200                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
201         }
202
203         err = v.Put(context.Background(), testHash, testData)
204         if err != nil {
205                 t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
206         }
207 }
208
209 // Put a block and put again with different content
210 // Test is intended for only writable volumes
211 func testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
212         v := factory(t)
213         defer v.Teardown()
214
215         if v.Writable() == false {
216                 return
217         }
218
219         v.PutRaw(testHash, testDataA)
220
221         putErr := v.Put(context.Background(), testHash, testDataB)
222         buf := make([]byte, BlockSize)
223         n, getErr := v.Get(context.Background(), testHash, buf)
224         if putErr == nil {
225                 // Put must not return a nil error unless it has
226                 // overwritten the existing data.
227                 if bytes.Compare(buf[:n], testDataB) != 0 {
228                         t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf[:n], testDataB)
229                 }
230         } else {
231                 // It is permissible for Put to fail, but it must
232                 // leave us with either the original data, the new
233                 // data, or nothing at all.
234                 if getErr == nil && bytes.Compare(buf[:n], testDataA) != 0 && bytes.Compare(buf[:n], testDataB) != 0 {
235                         t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf[:n], testDataA, testDataB)
236                 }
237         }
238 }
239
240 // Put and get multiple blocks
241 // Test is intended for only writable volumes
242 func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
243         v := factory(t)
244         defer v.Teardown()
245
246         if v.Writable() == false {
247                 return
248         }
249
250         err := v.Put(context.Background(), TestHash, TestBlock)
251         if err != nil {
252                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
253         }
254
255         err = v.Put(context.Background(), TestHash2, TestBlock2)
256         if err != nil {
257                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
258         }
259
260         err = v.Put(context.Background(), TestHash3, TestBlock3)
261         if err != nil {
262                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
263         }
264
265         data := make([]byte, BlockSize)
266         n, err := v.Get(context.Background(), TestHash, data)
267         if err != nil {
268                 t.Error(err)
269         } else {
270                 if bytes.Compare(data[:n], TestBlock) != 0 {
271                         t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock)
272                 }
273         }
274
275         n, err = v.Get(context.Background(), TestHash2, data)
276         if err != nil {
277                 t.Error(err)
278         } else {
279                 if bytes.Compare(data[:n], TestBlock2) != 0 {
280                         t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock2)
281                 }
282         }
283
284         n, err = v.Get(context.Background(), TestHash3, data)
285         if err != nil {
286                 t.Error(err)
287         } else {
288                 if bytes.Compare(data[:n], TestBlock3) != 0 {
289                         t.Errorf("Block present, but to %+q, expected %+q", data[:n], TestBlock3)
290                 }
291         }
292 }
293
294 // testPutAndTouch
295 //   Test that when applying PUT to a block that already exists,
296 //   the block's modification time is updated.
297 // Test is intended for only writable volumes
298 func testPutAndTouch(t TB, factory TestableVolumeFactory) {
299         v := factory(t)
300         defer v.Teardown()
301
302         if v.Writable() == false {
303                 return
304         }
305
306         if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
307                 t.Error(err)
308         }
309
310         // We'll verify { t0 < threshold < t1 }, where t0 is the
311         // existing block's timestamp on disk before Put() and t1 is
312         // its timestamp after Put().
313         threshold := time.Now().Add(-time.Second)
314
315         // Set the stored block's mtime far enough in the past that we
316         // can see the difference between "timestamp didn't change"
317         // and "timestamp granularity is too low".
318         v.TouchWithDate(TestHash, time.Now().Add(-20*time.Second))
319
320         // Make sure v.Mtime() agrees the above Utime really worked.
321         if t0, err := v.Mtime(TestHash); err != nil || t0.IsZero() || !t0.Before(threshold) {
322                 t.Errorf("Setting mtime failed: %v, %v", t0, err)
323         }
324
325         // Write the same block again.
326         if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
327                 t.Error(err)
328         }
329
330         // Verify threshold < t1
331         if t1, err := v.Mtime(TestHash); err != nil {
332                 t.Error(err)
333         } else if t1.Before(threshold) {
334                 t.Errorf("t1 %v should be >= threshold %v after v.Put ", t1, threshold)
335         }
336 }
337
338 // Touching a non-existing block should result in error.
339 // Test should pass for both writable and read-only volumes
340 func testTouchNoSuchBlock(t TB, factory TestableVolumeFactory) {
341         v := factory(t)
342         defer v.Teardown()
343
344         if err := v.Touch(TestHash); err == nil {
345                 t.Error("Expected error when attempted to touch a non-existing block")
346         }
347 }
348
349 // Invoking Mtime on a non-existing block should result in error.
350 // Test should pass for both writable and read-only volumes
351 func testMtimeNoSuchBlock(t TB, factory TestableVolumeFactory) {
352         v := factory(t)
353         defer v.Teardown()
354
355         if _, err := v.Mtime("12345678901234567890123456789012"); err == nil {
356                 t.Error("Expected error when updating Mtime on a non-existing block")
357         }
358 }
359
360 // Put a few blocks and invoke IndexTo with:
361 // * no prefix
362 // * with a prefix
363 // * with no such prefix
364 // Test should pass for both writable and read-only volumes
365 func testIndexTo(t TB, factory TestableVolumeFactory) {
366         v := factory(t)
367         defer v.Teardown()
368
369         // minMtime and maxMtime are the minimum and maximum
370         // acceptable values the index can report for our test
371         // blocks. 1-second precision is acceptable.
372         minMtime := time.Now().UTC().UnixNano()
373         minMtime -= minMtime % 1e9
374
375         v.PutRaw(TestHash, TestBlock)
376         v.PutRaw(TestHash2, TestBlock2)
377         v.PutRaw(TestHash3, TestBlock3)
378
379         maxMtime := time.Now().UTC().UnixNano()
380         if maxMtime%1e9 > 0 {
381                 maxMtime -= maxMtime % 1e9
382                 maxMtime += 1e9
383         }
384
385         // Blocks whose names aren't Keep hashes should be omitted from
386         // index
387         v.PutRaw("fffffffffnotreallyahashfffffffff", nil)
388         v.PutRaw("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", nil)
389         v.PutRaw("f0000000000000000000000000000000f", nil)
390         v.PutRaw("f00", nil)
391
392         buf := new(bytes.Buffer)
393         v.IndexTo("", buf)
394         indexRows := strings.Split(string(buf.Bytes()), "\n")
395         sort.Strings(indexRows)
396         sortedIndex := strings.Join(indexRows, "\n")
397         m := regexp.MustCompile(
398                 `^\n` + TestHash + `\+\d+ (\d+)\n` +
399                         TestHash3 + `\+\d+ \d+\n` +
400                         TestHash2 + `\+\d+ \d+$`,
401         ).FindStringSubmatch(sortedIndex)
402         if m == nil {
403                 t.Errorf("Got index %q for empty prefix", sortedIndex)
404         } else {
405                 mtime, err := strconv.ParseInt(m[1], 10, 64)
406                 if err != nil {
407                         t.Error(err)
408                 } else if mtime < minMtime || mtime > maxMtime {
409                         t.Errorf("got %d for TestHash timestamp, expected %d <= t <= %d",
410                                 mtime, minMtime, maxMtime)
411                 }
412         }
413
414         for _, prefix := range []string{"f", "f15", "f15ac"} {
415                 buf = new(bytes.Buffer)
416                 v.IndexTo(prefix, buf)
417
418                 m, err := regexp.MatchString(`^`+TestHash2+`\+\d+ \d+\n$`, string(buf.Bytes()))
419                 if err != nil {
420                         t.Error(err)
421                 } else if !m {
422                         t.Errorf("Got index %q for prefix %s", string(buf.Bytes()), prefix)
423                 }
424         }
425
426         for _, prefix := range []string{"zero", "zip", "zilch"} {
427                 buf = new(bytes.Buffer)
428                 err := v.IndexTo(prefix, buf)
429                 if err != nil {
430                         t.Errorf("Got error on IndexTo with no such prefix %v", err.Error())
431                 } else if buf.Len() != 0 {
432                         t.Errorf("Expected empty list for IndexTo with no such prefix %s", prefix)
433                 }
434         }
435 }
436
437 // Calling Delete() for a block immediately after writing it (not old enough)
438 // should neither delete the data nor return an error.
439 // Test is intended for only writable volumes
440 func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
441         v := factory(t)
442         defer v.Teardown()
443         theConfig.BlobSignatureTTL.Set("5m")
444
445         if v.Writable() == false {
446                 return
447         }
448
449         v.Put(context.Background(), TestHash, TestBlock)
450
451         if err := v.Trash(TestHash); err != nil {
452                 t.Error(err)
453         }
454         data := make([]byte, BlockSize)
455         n, err := v.Get(context.Background(), TestHash, data)
456         if err != nil {
457                 t.Error(err)
458         } else if bytes.Compare(data[:n], TestBlock) != 0 {
459                 t.Errorf("Got data %+q, expected %+q", data[:n], TestBlock)
460         }
461 }
462
463 // Calling Delete() for a block with a timestamp older than
464 // BlobSignatureTTL seconds in the past should delete the data.
465 // Test is intended for only writable volumes
466 func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
467         v := factory(t)
468         defer v.Teardown()
469         theConfig.BlobSignatureTTL.Set("5m")
470
471         if v.Writable() == false {
472                 return
473         }
474
475         v.Put(context.Background(), TestHash, TestBlock)
476         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
477
478         if err := v.Trash(TestHash); err != nil {
479                 t.Error(err)
480         }
481         data := make([]byte, BlockSize)
482         if _, err := v.Get(context.Background(), TestHash, data); err == nil || !os.IsNotExist(err) {
483                 t.Errorf("os.IsNotExist(%v) should have been true", err)
484         }
485
486         _, err := v.Mtime(TestHash)
487         if err == nil || !os.IsNotExist(err) {
488                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
489         }
490
491         err = v.Compare(context.Background(), TestHash, TestBlock)
492         if err == nil || !os.IsNotExist(err) {
493                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
494         }
495
496         indexBuf := new(bytes.Buffer)
497         v.IndexTo("", indexBuf)
498         if strings.Contains(string(indexBuf.Bytes()), TestHash) {
499                 t.Fatalf("Found trashed block in IndexTo")
500         }
501
502         err = v.Touch(TestHash)
503         if err == nil || !os.IsNotExist(err) {
504                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
505         }
506 }
507
508 // Calling Delete() for a block that does not exist should result in error.
509 // Test should pass for both writable and read-only volumes
510 func testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
511         v := factory(t)
512         defer v.Teardown()
513
514         if err := v.Trash(TestHash2); err == nil {
515                 t.Errorf("Expected error when attempting to delete a non-existing block")
516         }
517 }
518
519 // Invoke Status and verify that VolumeStatus is returned
520 // Test should pass for both writable and read-only volumes
521 func testStatus(t TB, factory TestableVolumeFactory) {
522         v := factory(t)
523         defer v.Teardown()
524
525         // Get node status and make a basic sanity check.
526         status := v.Status()
527         if status.DeviceNum == 0 {
528                 t.Errorf("uninitialized device_num in %v", status)
529         }
530
531         if status.BytesFree == 0 {
532                 t.Errorf("uninitialized bytes_free in %v", status)
533         }
534
535         if status.BytesUsed == 0 {
536                 t.Errorf("uninitialized bytes_used in %v", status)
537         }
538 }
539
540 func getValueFrom(cv *prometheus.CounterVec, lbls prometheus.Labels) float64 {
541         c, _ := cv.GetMetricWith(lbls)
542         pb := &dto.Metric{}
543         c.Write(pb)
544         return pb.GetCounter().GetValue()
545 }
546
547 func testMetrics(t TB, factory TestableVolumeFactory) {
548         var err error
549
550         v := factory(t)
551         defer v.Teardown()
552         reg := prometheus.NewRegistry()
553         vm := newVolumeMetricsVecs(reg)
554
555         err = v.Start(vm)
556         if err != nil {
557                 t.Error("Failed Start(): ", err)
558         }
559         opsC, _, ioC := v.GetMetricsVecs()
560
561         if ioC == nil {
562                 t.Error("ioBytes CounterVec is nil")
563                 return
564         }
565
566         if getValueFrom(ioC, prometheus.Labels{"direction": "out"})+
567                 getValueFrom(ioC, prometheus.Labels{"direction": "in"}) > 0 {
568                 t.Error("ioBytes counter should be zero")
569         }
570
571         if opsC == nil {
572                 t.Error("opsCounter CounterVec is nil")
573                 return
574         }
575
576         var c, anyOpCounter float64
577         anyOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": "any"})
578         // Test Put if volume is writable
579         if v.Writable() {
580                 err = v.Put(context.Background(), TestHash, TestBlock)
581                 if err != nil {
582                         t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
583                 }
584                 // Check that the operations counter increased
585                 c = getValueFrom(opsC, prometheus.Labels{"operation": "any"})
586                 if c <= anyOpCounter {
587                         t.Error("Operation(s) not counted on Put")
588                 }
589                 anyOpCounter = c
590                 // Check that bytes counter is > 0
591                 if getValueFrom(ioC, prometheus.Labels{"direction": "out"}) == 0 {
592                         t.Error("ioBytes{direction=out} counter shouldn't be zero")
593                 }
594         } else {
595                 v.PutRaw(TestHash, TestBlock)
596         }
597
598         buf := make([]byte, BlockSize)
599         _, err = v.Get(context.Background(), TestHash, buf)
600         if err != nil {
601                 t.Fatal(err)
602         }
603
604         // Check that the operations counter increased
605         c = getValueFrom(opsC, prometheus.Labels{"operation": "any"})
606         if c <= anyOpCounter {
607                 t.Error("Operation(s) not counted on Get")
608         }
609         anyOpCounter = c
610         // Check that the bytes "in" counter is > 0
611         if getValueFrom(ioC, prometheus.Labels{"direction": "in"}) == 0 {
612                 t.Error("ioBytes{direction=in} counter shouldn't be zero")
613         }
614 }
615
616 // Invoke String for the volume; expect non-empty result
617 // Test should pass for both writable and read-only volumes
618 func testString(t TB, factory TestableVolumeFactory) {
619         v := factory(t)
620         defer v.Teardown()
621
622         if id := v.String(); len(id) == 0 {
623                 t.Error("Got empty string for v.String()")
624         }
625 }
626
627 // Putting, updating, touching, and deleting blocks from a read-only volume result in error.
628 // Test is intended for only read-only volumes
629 func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
630         v := factory(t)
631         defer v.Teardown()
632
633         if v.Writable() == true {
634                 return
635         }
636
637         v.PutRaw(TestHash, TestBlock)
638         buf := make([]byte, BlockSize)
639
640         // Get from read-only volume should succeed
641         _, err := v.Get(context.Background(), TestHash, buf)
642         if err != nil {
643                 t.Errorf("got err %v, expected nil", err)
644         }
645
646         // Put a new block to read-only volume should result in error
647         err = v.Put(context.Background(), TestHash2, TestBlock2)
648         if err == nil {
649                 t.Errorf("Expected error when putting block in a read-only volume")
650         }
651         _, err = v.Get(context.Background(), TestHash2, buf)
652         if err == nil {
653                 t.Errorf("Expected error when getting block whose put in read-only volume failed")
654         }
655
656         // Touch a block in read-only volume should result in error
657         err = v.Touch(TestHash)
658         if err == nil {
659                 t.Errorf("Expected error when touching block in a read-only volume")
660         }
661
662         // Delete a block from a read-only volume should result in error
663         err = v.Trash(TestHash)
664         if err == nil {
665                 t.Errorf("Expected error when deleting block from a read-only volume")
666         }
667
668         // Overwriting an existing block in read-only volume should result in error
669         err = v.Put(context.Background(), TestHash, TestBlock)
670         if err == nil {
671                 t.Errorf("Expected error when putting block in a read-only volume")
672         }
673 }
674
675 // Launch concurrent Gets
676 // Test should pass for both writable and read-only volumes
677 func testGetConcurrent(t TB, factory TestableVolumeFactory) {
678         v := factory(t)
679         defer v.Teardown()
680
681         v.PutRaw(TestHash, TestBlock)
682         v.PutRaw(TestHash2, TestBlock2)
683         v.PutRaw(TestHash3, TestBlock3)
684
685         sem := make(chan int)
686         go func() {
687                 buf := make([]byte, BlockSize)
688                 n, err := v.Get(context.Background(), TestHash, buf)
689                 if err != nil {
690                         t.Errorf("err1: %v", err)
691                 }
692                 if bytes.Compare(buf[:n], TestBlock) != 0 {
693                         t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf[:n]))
694                 }
695                 sem <- 1
696         }()
697
698         go func() {
699                 buf := make([]byte, BlockSize)
700                 n, err := v.Get(context.Background(), TestHash2, buf)
701                 if err != nil {
702                         t.Errorf("err2: %v", err)
703                 }
704                 if bytes.Compare(buf[:n], TestBlock2) != 0 {
705                         t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf[:n]))
706                 }
707                 sem <- 1
708         }()
709
710         go func() {
711                 buf := make([]byte, BlockSize)
712                 n, err := v.Get(context.Background(), TestHash3, buf)
713                 if err != nil {
714                         t.Errorf("err3: %v", err)
715                 }
716                 if bytes.Compare(buf[:n], TestBlock3) != 0 {
717                         t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf[:n]))
718                 }
719                 sem <- 1
720         }()
721
722         // Wait for all goroutines to finish
723         for done := 0; done < 3; done++ {
724                 <-sem
725         }
726 }
727
728 // Launch concurrent Puts
729 // Test is intended for only writable volumes
730 func testPutConcurrent(t TB, factory TestableVolumeFactory) {
731         v := factory(t)
732         defer v.Teardown()
733
734         if v.Writable() == false {
735                 return
736         }
737
738         sem := make(chan int)
739         go func(sem chan int) {
740                 err := v.Put(context.Background(), TestHash, TestBlock)
741                 if err != nil {
742                         t.Errorf("err1: %v", err)
743                 }
744                 sem <- 1
745         }(sem)
746
747         go func(sem chan int) {
748                 err := v.Put(context.Background(), TestHash2, TestBlock2)
749                 if err != nil {
750                         t.Errorf("err2: %v", err)
751                 }
752                 sem <- 1
753         }(sem)
754
755         go func(sem chan int) {
756                 err := v.Put(context.Background(), TestHash3, TestBlock3)
757                 if err != nil {
758                         t.Errorf("err3: %v", err)
759                 }
760                 sem <- 1
761         }(sem)
762
763         // Wait for all goroutines to finish
764         for done := 0; done < 3; done++ {
765                 <-sem
766         }
767
768         // Double check that we actually wrote the blocks we expected to write.
769         buf := make([]byte, BlockSize)
770         n, err := v.Get(context.Background(), TestHash, buf)
771         if err != nil {
772                 t.Errorf("Get #1: %v", err)
773         }
774         if bytes.Compare(buf[:n], TestBlock) != 0 {
775                 t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
776         }
777
778         n, err = v.Get(context.Background(), TestHash2, buf)
779         if err != nil {
780                 t.Errorf("Get #2: %v", err)
781         }
782         if bytes.Compare(buf[:n], TestBlock2) != 0 {
783                 t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
784         }
785
786         n, err = v.Get(context.Background(), TestHash3, buf)
787         if err != nil {
788                 t.Errorf("Get #3: %v", err)
789         }
790         if bytes.Compare(buf[:n], TestBlock3) != 0 {
791                 t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf[:n]))
792         }
793 }
794
795 // Write and read back a full size block
796 func testPutFullBlock(t TB, factory TestableVolumeFactory) {
797         v := factory(t)
798         defer v.Teardown()
799
800         if !v.Writable() {
801                 return
802         }
803
804         wdata := make([]byte, BlockSize)
805         wdata[0] = 'a'
806         wdata[BlockSize-1] = 'z'
807         hash := fmt.Sprintf("%x", md5.Sum(wdata))
808         err := v.Put(context.Background(), hash, wdata)
809         if err != nil {
810                 t.Fatal(err)
811         }
812         buf := make([]byte, BlockSize)
813         n, err := v.Get(context.Background(), hash, buf)
814         if err != nil {
815                 t.Error(err)
816         }
817         if bytes.Compare(buf[:n], wdata) != 0 {
818                 t.Error("buf %+q != wdata %+q", buf[:n], wdata)
819         }
820 }
821
822 // With TrashLifetime != 0, perform:
823 // Trash an old block - which either raises ErrNotImplemented or succeeds
824 // Untrash -  which either raises ErrNotImplemented or succeeds
825 // Get - which must succeed
826 func testTrashUntrash(t TB, factory TestableVolumeFactory) {
827         v := factory(t)
828         defer v.Teardown()
829         defer func() {
830                 theConfig.TrashLifetime = 0
831         }()
832
833         theConfig.TrashLifetime.Set("1h")
834
835         // put block and backdate it
836         v.PutRaw(TestHash, TestBlock)
837         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
838
839         buf := make([]byte, BlockSize)
840         n, err := v.Get(context.Background(), TestHash, buf)
841         if err != nil {
842                 t.Fatal(err)
843         }
844         if bytes.Compare(buf[:n], TestBlock) != 0 {
845                 t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
846         }
847
848         // Trash
849         err = v.Trash(TestHash)
850         if v.Writable() == false {
851                 if err != MethodDisabledError {
852                         t.Fatal(err)
853                 }
854         } else if err != nil {
855                 if err != ErrNotImplemented {
856                         t.Fatal(err)
857                 }
858         } else {
859                 _, err = v.Get(context.Background(), TestHash, buf)
860                 if err == nil || !os.IsNotExist(err) {
861                         t.Errorf("os.IsNotExist(%v) should have been true", err)
862                 }
863
864                 // Untrash
865                 err = v.Untrash(TestHash)
866                 if err != nil {
867                         t.Fatal(err)
868                 }
869         }
870
871         // Get the block - after trash and untrash sequence
872         n, err = v.Get(context.Background(), TestHash, buf)
873         if err != nil {
874                 t.Fatal(err)
875         }
876         if bytes.Compare(buf[:n], TestBlock) != 0 {
877                 t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
878         }
879 }
880
881 func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
882         v := factory(t)
883         defer v.Teardown()
884         defer func(orig arvados.Duration) {
885                 theConfig.TrashLifetime = orig
886         }(theConfig.TrashLifetime)
887
888         checkGet := func() error {
889                 buf := make([]byte, BlockSize)
890                 n, err := v.Get(context.Background(), TestHash, buf)
891                 if err != nil {
892                         return err
893                 }
894                 if bytes.Compare(buf[:n], TestBlock) != 0 {
895                         t.Fatalf("Got data %+q, expected %+q", buf[:n], TestBlock)
896                 }
897
898                 _, err = v.Mtime(TestHash)
899                 if err != nil {
900                         return err
901                 }
902
903                 err = v.Compare(context.Background(), TestHash, TestBlock)
904                 if err != nil {
905                         return err
906                 }
907
908                 indexBuf := new(bytes.Buffer)
909                 v.IndexTo("", indexBuf)
910                 if !strings.Contains(string(indexBuf.Bytes()), TestHash) {
911                         return os.ErrNotExist
912                 }
913
914                 return nil
915         }
916
917         // First set: EmptyTrash before reaching the trash deadline.
918
919         theConfig.TrashLifetime.Set("1h")
920
921         v.PutRaw(TestHash, TestBlock)
922         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
923
924         err := checkGet()
925         if err != nil {
926                 t.Fatal(err)
927         }
928
929         // Trash the block
930         err = v.Trash(TestHash)
931         if err == MethodDisabledError || err == ErrNotImplemented {
932                 // Skip the trash tests for read-only volumes, and
933                 // volume types that don't support TrashLifetime>0.
934                 return
935         }
936
937         err = checkGet()
938         if err == nil || !os.IsNotExist(err) {
939                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
940         }
941
942         err = v.Touch(TestHash)
943         if err == nil || !os.IsNotExist(err) {
944                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
945         }
946
947         v.EmptyTrash()
948
949         // Even after emptying the trash, we can untrash our block
950         // because the deadline hasn't been reached.
951         err = v.Untrash(TestHash)
952         if err != nil {
953                 t.Fatal(err)
954         }
955
956         err = checkGet()
957         if err != nil {
958                 t.Fatal(err)
959         }
960
961         err = v.Touch(TestHash)
962         if err != nil {
963                 t.Fatal(err)
964         }
965
966         // Because we Touch'ed, need to backdate again for next set of tests
967         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
968
969         // If the only block in the trash has already been untrashed,
970         // most volumes will fail a subsequent Untrash with a 404, but
971         // it's also acceptable for Untrash to succeed.
972         err = v.Untrash(TestHash)
973         if err != nil && !os.IsNotExist(err) {
974                 t.Fatalf("Expected success or os.IsNotExist(), but got: %v", err)
975         }
976
977         // The additional Untrash should not interfere with our
978         // already-untrashed copy.
979         err = checkGet()
980         if err != nil {
981                 t.Fatal(err)
982         }
983
984         // Untrash might have updated the timestamp, so backdate again
985         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
986
987         // Second set: EmptyTrash after the trash deadline has passed.
988
989         theConfig.TrashLifetime.Set("1ns")
990
991         err = v.Trash(TestHash)
992         if err != nil {
993                 t.Fatal(err)
994         }
995         err = checkGet()
996         if err == nil || !os.IsNotExist(err) {
997                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
998         }
999
1000         // Even though 1ns has passed, we can untrash because we
1001         // haven't called EmptyTrash yet.
1002         err = v.Untrash(TestHash)
1003         if err != nil {
1004                 t.Fatal(err)
1005         }
1006         err = checkGet()
1007         if err != nil {
1008                 t.Fatal(err)
1009         }
1010
1011         // Trash it again, and this time call EmptyTrash so it really
1012         // goes away.
1013         // (In Azure volumes, un/trash changes Mtime, so first backdate again)
1014         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
1015         _ = v.Trash(TestHash)
1016         err = checkGet()
1017         if err == nil || !os.IsNotExist(err) {
1018                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1019         }
1020         v.EmptyTrash()
1021
1022         // Untrash won't find it
1023         err = v.Untrash(TestHash)
1024         if err == nil || !os.IsNotExist(err) {
1025                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1026         }
1027
1028         // Get block won't find it
1029         err = checkGet()
1030         if err == nil || !os.IsNotExist(err) {
1031                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1032         }
1033
1034         // Third set: If the same data block gets written again after
1035         // being trashed, and then the trash gets emptied, the newer
1036         // un-trashed copy doesn't get deleted along with it.
1037
1038         v.PutRaw(TestHash, TestBlock)
1039         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
1040
1041         theConfig.TrashLifetime.Set("1ns")
1042         err = v.Trash(TestHash)
1043         if err != nil {
1044                 t.Fatal(err)
1045         }
1046         err = checkGet()
1047         if err == nil || !os.IsNotExist(err) {
1048                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1049         }
1050
1051         v.PutRaw(TestHash, TestBlock)
1052         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
1053
1054         // EmptyTrash should not delete the untrashed copy.
1055         v.EmptyTrash()
1056         err = checkGet()
1057         if err != nil {
1058                 t.Fatal(err)
1059         }
1060
1061         // Fourth set: If the same data block gets trashed twice with
1062         // different deadlines A and C, and then the trash is emptied
1063         // at intermediate time B (A < B < C), it is still possible to
1064         // untrash the block whose deadline is "C".
1065
1066         v.PutRaw(TestHash, TestBlock)
1067         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
1068
1069         theConfig.TrashLifetime.Set("1ns")
1070         err = v.Trash(TestHash)
1071         if err != nil {
1072                 t.Fatal(err)
1073         }
1074
1075         v.PutRaw(TestHash, TestBlock)
1076         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
1077
1078         theConfig.TrashLifetime.Set("1h")
1079         err = v.Trash(TestHash)
1080         if err != nil {
1081                 t.Fatal(err)
1082         }
1083
1084         // EmptyTrash should not prevent us from recovering the
1085         // time.Hour ("C") trash
1086         v.EmptyTrash()
1087         err = v.Untrash(TestHash)
1088         if err != nil {
1089                 t.Fatal(err)
1090         }
1091         err = checkGet()
1092         if err != nil {
1093                 t.Fatal(err)
1094         }
1095 }