Merge branch '13593-async-perm-graph-update'
[arvados.git] / services / keepstore / volume_generic_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "os"
13         "regexp"
14         "sort"
15         "strconv"
16         "strings"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
21         "github.com/prometheus/client_golang/prometheus"
22         dto "github.com/prometheus/client_model/go"
23 )
24
25 type TB interface {
26         Error(args ...interface{})
27         Errorf(format string, args ...interface{})
28         Fail()
29         FailNow()
30         Failed() bool
31         Fatal(args ...interface{})
32         Fatalf(format string, args ...interface{})
33         Log(args ...interface{})
34         Logf(format string, args ...interface{})
35 }
36
37 // A TestableVolumeFactory returns a new TestableVolume. The factory
38 // function, and the TestableVolume it returns, can use "t" to write
39 // logs, fail the current test, etc.
40 type TestableVolumeFactory func(t TB) TestableVolume
41
42 // DoGenericVolumeTests runs a set of tests that every TestableVolume
43 // is expected to pass. It calls factory to create a new TestableVolume
44 // for each test case, to avoid leaking state between tests.
45 func DoGenericVolumeTests(t TB, factory TestableVolumeFactory) {
46         testGet(t, factory)
47         testGetNoSuchBlock(t, factory)
48
49         testCompareNonexistent(t, factory)
50         testCompareSameContent(t, factory, TestHash, TestBlock)
51         testCompareSameContent(t, factory, EmptyHash, EmptyBlock)
52         testCompareWithCollision(t, factory, TestHash, TestBlock, []byte("baddata"))
53         testCompareWithCollision(t, factory, TestHash, TestBlock, EmptyBlock)
54         testCompareWithCollision(t, factory, EmptyHash, EmptyBlock, TestBlock)
55         testCompareWithCorruptStoredData(t, factory, TestHash, TestBlock, []byte("baddata"))
56         testCompareWithCorruptStoredData(t, factory, TestHash, TestBlock, EmptyBlock)
57         testCompareWithCorruptStoredData(t, factory, EmptyHash, EmptyBlock, []byte("baddata"))
58
59         testPutBlockWithSameContent(t, factory, TestHash, TestBlock)
60         testPutBlockWithSameContent(t, factory, EmptyHash, EmptyBlock)
61         testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], arvadostest.MD5CollisionData[1])
62         testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, EmptyBlock, arvadostest.MD5CollisionData[0])
63         testPutBlockWithDifferentContent(t, factory, arvadostest.MD5CollisionMD5, arvadostest.MD5CollisionData[0], EmptyBlock)
64         testPutBlockWithDifferentContent(t, factory, EmptyHash, EmptyBlock, arvadostest.MD5CollisionData[0])
65         testPutMultipleBlocks(t, factory)
66
67         testPutAndTouch(t, factory)
68         testTouchNoSuchBlock(t, factory)
69
70         testMtimeNoSuchBlock(t, factory)
71
72         testIndexTo(t, factory)
73
74         testDeleteNewBlock(t, factory)
75         testDeleteOldBlock(t, factory)
76         testDeleteNoSuchBlock(t, factory)
77
78         testStatus(t, factory)
79
80         testMetrics(t, factory)
81
82         testString(t, factory)
83
84         testUpdateReadOnly(t, factory)
85
86         testGetConcurrent(t, factory)
87         testPutConcurrent(t, factory)
88
89         testPutFullBlock(t, factory)
90
91         testTrashUntrash(t, factory)
92         testTrashEmptyTrashUntrash(t, factory)
93 }
94
95 // Put a test block, get it and verify content
96 // Test should pass for both writable and read-only volumes
97 func testGet(t TB, factory TestableVolumeFactory) {
98         v := factory(t)
99         defer v.Teardown()
100
101         v.PutRaw(TestHash, TestBlock)
102
103         buf := make([]byte, BlockSize)
104         n, err := v.Get(context.Background(), TestHash, buf)
105         if err != nil {
106                 t.Fatal(err)
107         }
108
109         if bytes.Compare(buf[:n], TestBlock) != 0 {
110                 t.Errorf("expected %s, got %s", string(TestBlock), string(buf))
111         }
112 }
113
114 // Invoke get on a block that does not exist in volume; should result in error
115 // Test should pass for both writable and read-only volumes
116 func testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
117         v := factory(t)
118         defer v.Teardown()
119
120         buf := make([]byte, BlockSize)
121         if _, err := v.Get(context.Background(), TestHash2, buf); err == nil {
122                 t.Errorf("Expected error while getting non-existing block %v", TestHash2)
123         }
124 }
125
126 // Compare() should return os.ErrNotExist if the block does not exist.
127 // Otherwise, writing new data causes CompareAndTouch() to generate
128 // error logs even though everything is working fine.
129 func testCompareNonexistent(t TB, factory TestableVolumeFactory) {
130         v := factory(t)
131         defer v.Teardown()
132
133         err := v.Compare(context.Background(), TestHash, TestBlock)
134         if err != os.ErrNotExist {
135                 t.Errorf("Got err %T %q, expected os.ErrNotExist", err, err)
136         }
137 }
138
139 // Put a test block and compare the locator with same content
140 // Test should pass for both writable and read-only volumes
141 func testCompareSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
142         v := factory(t)
143         defer v.Teardown()
144
145         v.PutRaw(testHash, testData)
146
147         // Compare the block locator with same content
148         err := v.Compare(context.Background(), testHash, testData)
149         if err != nil {
150                 t.Errorf("Got err %q, expected nil", err)
151         }
152 }
153
154 // Test behavior of Compare() when stored data matches expected
155 // checksum but differs from new data we need to store. Requires
156 // testHash = md5(testDataA).
157 //
158 // Test should pass for both writable and read-only volumes
159 func testCompareWithCollision(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
160         v := factory(t)
161         defer v.Teardown()
162
163         v.PutRaw(testHash, testDataA)
164
165         // Compare the block locator with different content; collision
166         err := v.Compare(context.Background(), TestHash, testDataB)
167         if err == nil {
168                 t.Errorf("Got err nil, expected error due to collision")
169         }
170 }
171
172 // Test behavior of Compare() when stored data has become
173 // corrupted. Requires testHash = md5(testDataA) != md5(testDataB).
174 //
175 // Test should pass for both writable and read-only volumes
176 func testCompareWithCorruptStoredData(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
177         v := factory(t)
178         defer v.Teardown()
179
180         v.PutRaw(TestHash, testDataB)
181
182         err := v.Compare(context.Background(), testHash, testDataA)
183         if err == nil || err == CollisionError {
184                 t.Errorf("Got err %+v, expected non-collision error", err)
185         }
186 }
187
188 // Put a block and put again with same content
189 // Test is intended for only writable volumes
190 func testPutBlockWithSameContent(t TB, factory TestableVolumeFactory, testHash string, testData []byte) {
191         v := factory(t)
192         defer v.Teardown()
193
194         if v.Writable() == false {
195                 return
196         }
197
198         err := v.Put(context.Background(), testHash, testData)
199         if err != nil {
200                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
201         }
202
203         err = v.Put(context.Background(), testHash, testData)
204         if err != nil {
205                 t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
206         }
207 }
208
209 // Put a block and put again with different content
210 // Test is intended for only writable volumes
211 func testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testHash string, testDataA, testDataB []byte) {
212         v := factory(t)
213         defer v.Teardown()
214
215         if v.Writable() == false {
216                 return
217         }
218
219         v.PutRaw(testHash, testDataA)
220
221         putErr := v.Put(context.Background(), testHash, testDataB)
222         buf := make([]byte, BlockSize)
223         n, getErr := v.Get(context.Background(), testHash, buf)
224         if putErr == nil {
225                 // Put must not return a nil error unless it has
226                 // overwritten the existing data.
227                 if bytes.Compare(buf[:n], testDataB) != 0 {
228                         t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf[:n], testDataB)
229                 }
230         } else {
231                 // It is permissible for Put to fail, but it must
232                 // leave us with either the original data, the new
233                 // data, or nothing at all.
234                 if getErr == nil && bytes.Compare(buf[:n], testDataA) != 0 && bytes.Compare(buf[:n], testDataB) != 0 {
235                         t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf[:n], testDataA, testDataB)
236                 }
237         }
238 }
239
240 // Put and get multiple blocks
241 // Test is intended for only writable volumes
242 func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
243         v := factory(t)
244         defer v.Teardown()
245
246         if v.Writable() == false {
247                 return
248         }
249
250         err := v.Put(context.Background(), TestHash, TestBlock)
251         if err != nil {
252                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
253         }
254
255         err = v.Put(context.Background(), TestHash2, TestBlock2)
256         if err != nil {
257                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
258         }
259
260         err = v.Put(context.Background(), TestHash3, TestBlock3)
261         if err != nil {
262                 t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
263         }
264
265         data := make([]byte, BlockSize)
266         n, err := v.Get(context.Background(), TestHash, data)
267         if err != nil {
268                 t.Error(err)
269         } else {
270                 if bytes.Compare(data[:n], TestBlock) != 0 {
271                         t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock)
272                 }
273         }
274
275         n, err = v.Get(context.Background(), TestHash2, data)
276         if err != nil {
277                 t.Error(err)
278         } else {
279                 if bytes.Compare(data[:n], TestBlock2) != 0 {
280                         t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock2)
281                 }
282         }
283
284         n, err = v.Get(context.Background(), TestHash3, data)
285         if err != nil {
286                 t.Error(err)
287         } else {
288                 if bytes.Compare(data[:n], TestBlock3) != 0 {
289                         t.Errorf("Block present, but to %+q, expected %+q", data[:n], TestBlock3)
290                 }
291         }
292 }
293
294 // testPutAndTouch
295 //   Test that when applying PUT to a block that already exists,
296 //   the block's modification time is updated.
297 // Test is intended for only writable volumes
298 func testPutAndTouch(t TB, factory TestableVolumeFactory) {
299         v := factory(t)
300         defer v.Teardown()
301
302         if v.Writable() == false {
303                 return
304         }
305
306         if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
307                 t.Error(err)
308         }
309
310         // We'll verify { t0 < threshold < t1 }, where t0 is the
311         // existing block's timestamp on disk before Put() and t1 is
312         // its timestamp after Put().
313         threshold := time.Now().Add(-time.Second)
314
315         // Set the stored block's mtime far enough in the past that we
316         // can see the difference between "timestamp didn't change"
317         // and "timestamp granularity is too low".
318         v.TouchWithDate(TestHash, time.Now().Add(-20*time.Second))
319
320         // Make sure v.Mtime() agrees the above Utime really worked.
321         if t0, err := v.Mtime(TestHash); err != nil || t0.IsZero() || !t0.Before(threshold) {
322                 t.Errorf("Setting mtime failed: %v, %v", t0, err)
323         }
324
325         // Write the same block again.
326         if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
327                 t.Error(err)
328         }
329
330         // Verify threshold < t1
331         if t1, err := v.Mtime(TestHash); err != nil {
332                 t.Error(err)
333         } else if t1.Before(threshold) {
334                 t.Errorf("t1 %v should be >= threshold %v after v.Put ", t1, threshold)
335         }
336 }
337
338 // Touching a non-existing block should result in error.
339 // Test should pass for both writable and read-only volumes
340 func testTouchNoSuchBlock(t TB, factory TestableVolumeFactory) {
341         v := factory(t)
342         defer v.Teardown()
343
344         if err := v.Touch(TestHash); err == nil {
345                 t.Error("Expected error when attempted to touch a non-existing block")
346         }
347 }
348
349 // Invoking Mtime on a non-existing block should result in error.
350 // Test should pass for both writable and read-only volumes
351 func testMtimeNoSuchBlock(t TB, factory TestableVolumeFactory) {
352         v := factory(t)
353         defer v.Teardown()
354
355         if _, err := v.Mtime("12345678901234567890123456789012"); err == nil {
356                 t.Error("Expected error when updating Mtime on a non-existing block")
357         }
358 }
359
360 // Put a few blocks and invoke IndexTo with:
361 // * no prefix
362 // * with a prefix
363 // * with no such prefix
364 // Test should pass for both writable and read-only volumes
365 func testIndexTo(t TB, factory TestableVolumeFactory) {
366         v := factory(t)
367         defer v.Teardown()
368
369         // minMtime and maxMtime are the minimum and maximum
370         // acceptable values the index can report for our test
371         // blocks. 1-second precision is acceptable.
372         minMtime := time.Now().UTC().UnixNano()
373         minMtime -= minMtime % 1e9
374
375         v.PutRaw(TestHash, TestBlock)
376         v.PutRaw(TestHash2, TestBlock2)
377         v.PutRaw(TestHash3, TestBlock3)
378
379         maxMtime := time.Now().UTC().UnixNano()
380         if maxMtime%1e9 > 0 {
381                 maxMtime -= maxMtime % 1e9
382                 maxMtime += 1e9
383         }
384
385         // Blocks whose names aren't Keep hashes should be omitted from
386         // index
387         v.PutRaw("fffffffffnotreallyahashfffffffff", nil)
388         v.PutRaw("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", nil)
389         v.PutRaw("f0000000000000000000000000000000f", nil)
390         v.PutRaw("f00", nil)
391
392         buf := new(bytes.Buffer)
393         v.IndexTo("", buf)
394         indexRows := strings.Split(string(buf.Bytes()), "\n")
395         sort.Strings(indexRows)
396         sortedIndex := strings.Join(indexRows, "\n")
397         m := regexp.MustCompile(
398                 `^\n` + TestHash + `\+\d+ (\d+)\n` +
399                         TestHash3 + `\+\d+ \d+\n` +
400                         TestHash2 + `\+\d+ \d+$`,
401         ).FindStringSubmatch(sortedIndex)
402         if m == nil {
403                 t.Errorf("Got index %q for empty prefix", sortedIndex)
404         } else {
405                 mtime, err := strconv.ParseInt(m[1], 10, 64)
406                 if err != nil {
407                         t.Error(err)
408                 } else if mtime < minMtime || mtime > maxMtime {
409                         t.Errorf("got %d for TestHash timestamp, expected %d <= t <= %d",
410                                 mtime, minMtime, maxMtime)
411                 }
412         }
413
414         for _, prefix := range []string{"f", "f15", "f15ac"} {
415                 buf = new(bytes.Buffer)
416                 v.IndexTo(prefix, buf)
417
418                 m, err := regexp.MatchString(`^`+TestHash2+`\+\d+ \d+\n$`, string(buf.Bytes()))
419                 if err != nil {
420                         t.Error(err)
421                 } else if !m {
422                         t.Errorf("Got index %q for prefix %s", string(buf.Bytes()), prefix)
423                 }
424         }
425
426         for _, prefix := range []string{"zero", "zip", "zilch"} {
427                 buf = new(bytes.Buffer)
428                 err := v.IndexTo(prefix, buf)
429                 if err != nil {
430                         t.Errorf("Got error on IndexTo with no such prefix %v", err.Error())
431                 } else if buf.Len() != 0 {
432                         t.Errorf("Expected empty list for IndexTo with no such prefix %s", prefix)
433                 }
434         }
435 }
436
437 // Calling Delete() for a block immediately after writing it (not old enough)
438 // should neither delete the data nor return an error.
439 // Test is intended for only writable volumes
440 func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
441         v := factory(t)
442         defer v.Teardown()
443         theConfig.BlobSignatureTTL.Set("5m")
444
445         if v.Writable() == false {
446                 return
447         }
448
449         v.Put(context.Background(), TestHash, TestBlock)
450
451         if err := v.Trash(TestHash); err != nil {
452                 t.Error(err)
453         }
454         data := make([]byte, BlockSize)
455         n, err := v.Get(context.Background(), TestHash, data)
456         if err != nil {
457                 t.Error(err)
458         } else if bytes.Compare(data[:n], TestBlock) != 0 {
459                 t.Errorf("Got data %+q, expected %+q", data[:n], TestBlock)
460         }
461 }
462
463 // Calling Delete() for a block with a timestamp older than
464 // BlobSignatureTTL seconds in the past should delete the data.
465 // Test is intended for only writable volumes
466 func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
467         v := factory(t)
468         defer v.Teardown()
469         theConfig.BlobSignatureTTL.Set("5m")
470
471         if v.Writable() == false {
472                 return
473         }
474
475         v.Put(context.Background(), TestHash, TestBlock)
476         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
477
478         if err := v.Trash(TestHash); err != nil {
479                 t.Error(err)
480         }
481         data := make([]byte, BlockSize)
482         if _, err := v.Get(context.Background(), TestHash, data); err == nil || !os.IsNotExist(err) {
483                 t.Errorf("os.IsNotExist(%v) should have been true", err)
484         }
485
486         _, err := v.Mtime(TestHash)
487         if err == nil || !os.IsNotExist(err) {
488                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
489         }
490
491         err = v.Compare(context.Background(), TestHash, TestBlock)
492         if err == nil || !os.IsNotExist(err) {
493                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
494         }
495
496         indexBuf := new(bytes.Buffer)
497         v.IndexTo("", indexBuf)
498         if strings.Contains(string(indexBuf.Bytes()), TestHash) {
499                 t.Fatalf("Found trashed block in IndexTo")
500         }
501
502         err = v.Touch(TestHash)
503         if err == nil || !os.IsNotExist(err) {
504                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
505         }
506 }
507
508 // Calling Delete() for a block that does not exist should result in error.
509 // Test should pass for both writable and read-only volumes
510 func testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
511         v := factory(t)
512         defer v.Teardown()
513
514         if err := v.Trash(TestHash2); err == nil {
515                 t.Errorf("Expected error when attempting to delete a non-existing block")
516         }
517 }
518
519 // Invoke Status and verify that VolumeStatus is returned
520 // Test should pass for both writable and read-only volumes
521 func testStatus(t TB, factory TestableVolumeFactory) {
522         v := factory(t)
523         defer v.Teardown()
524
525         // Get node status and make a basic sanity check.
526         status := v.Status()
527         if status.DeviceNum == 0 {
528                 t.Errorf("uninitialized device_num in %v", status)
529         }
530
531         if status.BytesFree == 0 {
532                 t.Errorf("uninitialized bytes_free in %v", status)
533         }
534
535         if status.BytesUsed == 0 {
536                 t.Errorf("uninitialized bytes_used in %v", status)
537         }
538 }
539
540 func getValueFrom(cv *prometheus.CounterVec, lbls prometheus.Labels) float64 {
541         c, _ := cv.GetMetricWith(lbls)
542         pb := &dto.Metric{}
543         c.Write(pb)
544         return pb.GetCounter().GetValue()
545 }
546
547 func testMetrics(t TB, factory TestableVolumeFactory) {
548         var err error
549
550         v := factory(t)
551         defer v.Teardown()
552         reg := prometheus.NewRegistry()
553         vm := newVolumeMetricsVecs(reg)
554
555         err = v.Start(vm)
556         if err != nil {
557                 t.Error("Failed Start(): ", err)
558         }
559         opsC, _, ioC := vm.getCounterVecsFor(prometheus.Labels{"device_id": v.DeviceID()})
560
561         if ioC == nil {
562                 t.Error("ioBytes CounterVec is nil")
563                 return
564         }
565
566         if getValueFrom(ioC, prometheus.Labels{"direction": "out"})+
567                 getValueFrom(ioC, prometheus.Labels{"direction": "in"}) > 0 {
568                 t.Error("ioBytes counter should be zero")
569         }
570
571         if opsC == nil {
572                 t.Error("opsCounter CounterVec is nil")
573                 return
574         }
575
576         var c, writeOpCounter, readOpCounter float64
577
578         readOpType, writeOpType := v.ReadWriteOperationLabelValues()
579         writeOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
580         readOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
581
582         // Test Put if volume is writable
583         if v.Writable() {
584                 err = v.Put(context.Background(), TestHash, TestBlock)
585                 if err != nil {
586                         t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
587                 }
588                 // Check that the write operations counter increased
589                 c = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
590                 if c <= writeOpCounter {
591                         t.Error("Operation(s) not counted on Put")
592                 }
593                 // Check that bytes counter is > 0
594                 if getValueFrom(ioC, prometheus.Labels{"direction": "out"}) == 0 {
595                         t.Error("ioBytes{direction=out} counter shouldn't be zero")
596                 }
597         } else {
598                 v.PutRaw(TestHash, TestBlock)
599         }
600
601         buf := make([]byte, BlockSize)
602         _, err = v.Get(context.Background(), TestHash, buf)
603         if err != nil {
604                 t.Fatal(err)
605         }
606
607         // Check that the operations counter increased
608         c = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
609         if c <= readOpCounter {
610                 t.Error("Operation(s) not counted on Get")
611         }
612         // Check that the bytes "in" counter is > 0
613         if getValueFrom(ioC, prometheus.Labels{"direction": "in"}) == 0 {
614                 t.Error("ioBytes{direction=in} counter shouldn't be zero")
615         }
616 }
617
618 // Invoke String for the volume; expect non-empty result
619 // Test should pass for both writable and read-only volumes
620 func testString(t TB, factory TestableVolumeFactory) {
621         v := factory(t)
622         defer v.Teardown()
623
624         if id := v.String(); len(id) == 0 {
625                 t.Error("Got empty string for v.String()")
626         }
627 }
628
629 // Putting, updating, touching, and deleting blocks from a read-only volume result in error.
630 // Test is intended for only read-only volumes
631 func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
632         v := factory(t)
633         defer v.Teardown()
634
635         if v.Writable() == true {
636                 return
637         }
638
639         v.PutRaw(TestHash, TestBlock)
640         buf := make([]byte, BlockSize)
641
642         // Get from read-only volume should succeed
643         _, err := v.Get(context.Background(), TestHash, buf)
644         if err != nil {
645                 t.Errorf("got err %v, expected nil", err)
646         }
647
648         // Put a new block to read-only volume should result in error
649         err = v.Put(context.Background(), TestHash2, TestBlock2)
650         if err == nil {
651                 t.Errorf("Expected error when putting block in a read-only volume")
652         }
653         _, err = v.Get(context.Background(), TestHash2, buf)
654         if err == nil {
655                 t.Errorf("Expected error when getting block whose put in read-only volume failed")
656         }
657
658         // Touch a block in read-only volume should result in error
659         err = v.Touch(TestHash)
660         if err == nil {
661                 t.Errorf("Expected error when touching block in a read-only volume")
662         }
663
664         // Delete a block from a read-only volume should result in error
665         err = v.Trash(TestHash)
666         if err == nil {
667                 t.Errorf("Expected error when deleting block from a read-only volume")
668         }
669
670         // Overwriting an existing block in read-only volume should result in error
671         err = v.Put(context.Background(), TestHash, TestBlock)
672         if err == nil {
673                 t.Errorf("Expected error when putting block in a read-only volume")
674         }
675 }
676
677 // Launch concurrent Gets
678 // Test should pass for both writable and read-only volumes
679 func testGetConcurrent(t TB, factory TestableVolumeFactory) {
680         v := factory(t)
681         defer v.Teardown()
682
683         v.PutRaw(TestHash, TestBlock)
684         v.PutRaw(TestHash2, TestBlock2)
685         v.PutRaw(TestHash3, TestBlock3)
686
687         sem := make(chan int)
688         go func() {
689                 buf := make([]byte, BlockSize)
690                 n, err := v.Get(context.Background(), TestHash, buf)
691                 if err != nil {
692                         t.Errorf("err1: %v", err)
693                 }
694                 if bytes.Compare(buf[:n], TestBlock) != 0 {
695                         t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf[:n]))
696                 }
697                 sem <- 1
698         }()
699
700         go func() {
701                 buf := make([]byte, BlockSize)
702                 n, err := v.Get(context.Background(), TestHash2, buf)
703                 if err != nil {
704                         t.Errorf("err2: %v", err)
705                 }
706                 if bytes.Compare(buf[:n], TestBlock2) != 0 {
707                         t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf[:n]))
708                 }
709                 sem <- 1
710         }()
711
712         go func() {
713                 buf := make([]byte, BlockSize)
714                 n, err := v.Get(context.Background(), TestHash3, buf)
715                 if err != nil {
716                         t.Errorf("err3: %v", err)
717                 }
718                 if bytes.Compare(buf[:n], TestBlock3) != 0 {
719                         t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf[:n]))
720                 }
721                 sem <- 1
722         }()
723
724         // Wait for all goroutines to finish
725         for done := 0; done < 3; done++ {
726                 <-sem
727         }
728 }
729
730 // Launch concurrent Puts
731 // Test is intended for only writable volumes
732 func testPutConcurrent(t TB, factory TestableVolumeFactory) {
733         v := factory(t)
734         defer v.Teardown()
735
736         if v.Writable() == false {
737                 return
738         }
739
740         sem := make(chan int)
741         go func(sem chan int) {
742                 err := v.Put(context.Background(), TestHash, TestBlock)
743                 if err != nil {
744                         t.Errorf("err1: %v", err)
745                 }
746                 sem <- 1
747         }(sem)
748
749         go func(sem chan int) {
750                 err := v.Put(context.Background(), TestHash2, TestBlock2)
751                 if err != nil {
752                         t.Errorf("err2: %v", err)
753                 }
754                 sem <- 1
755         }(sem)
756
757         go func(sem chan int) {
758                 err := v.Put(context.Background(), TestHash3, TestBlock3)
759                 if err != nil {
760                         t.Errorf("err3: %v", err)
761                 }
762                 sem <- 1
763         }(sem)
764
765         // Wait for all goroutines to finish
766         for done := 0; done < 3; done++ {
767                 <-sem
768         }
769
770         // Double check that we actually wrote the blocks we expected to write.
771         buf := make([]byte, BlockSize)
772         n, err := v.Get(context.Background(), TestHash, buf)
773         if err != nil {
774                 t.Errorf("Get #1: %v", err)
775         }
776         if bytes.Compare(buf[:n], TestBlock) != 0 {
777                 t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
778         }
779
780         n, err = v.Get(context.Background(), TestHash2, buf)
781         if err != nil {
782                 t.Errorf("Get #2: %v", err)
783         }
784         if bytes.Compare(buf[:n], TestBlock2) != 0 {
785                 t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
786         }
787
788         n, err = v.Get(context.Background(), TestHash3, buf)
789         if err != nil {
790                 t.Errorf("Get #3: %v", err)
791         }
792         if bytes.Compare(buf[:n], TestBlock3) != 0 {
793                 t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf[:n]))
794         }
795 }
796
797 // Write and read back a full size block
798 func testPutFullBlock(t TB, factory TestableVolumeFactory) {
799         v := factory(t)
800         defer v.Teardown()
801
802         if !v.Writable() {
803                 return
804         }
805
806         wdata := make([]byte, BlockSize)
807         wdata[0] = 'a'
808         wdata[BlockSize-1] = 'z'
809         hash := fmt.Sprintf("%x", md5.Sum(wdata))
810         err := v.Put(context.Background(), hash, wdata)
811         if err != nil {
812                 t.Fatal(err)
813         }
814         buf := make([]byte, BlockSize)
815         n, err := v.Get(context.Background(), hash, buf)
816         if err != nil {
817                 t.Error(err)
818         }
819         if bytes.Compare(buf[:n], wdata) != 0 {
820                 t.Error("buf %+q != wdata %+q", buf[:n], wdata)
821         }
822 }
823
824 // With TrashLifetime != 0, perform:
825 // Trash an old block - which either raises ErrNotImplemented or succeeds
826 // Untrash -  which either raises ErrNotImplemented or succeeds
827 // Get - which must succeed
828 func testTrashUntrash(t TB, factory TestableVolumeFactory) {
829         v := factory(t)
830         defer v.Teardown()
831         defer func() {
832                 theConfig.TrashLifetime = 0
833         }()
834
835         theConfig.TrashLifetime.Set("1h")
836
837         // put block and backdate it
838         v.PutRaw(TestHash, TestBlock)
839         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
840
841         buf := make([]byte, BlockSize)
842         n, err := v.Get(context.Background(), TestHash, buf)
843         if err != nil {
844                 t.Fatal(err)
845         }
846         if bytes.Compare(buf[:n], TestBlock) != 0 {
847                 t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
848         }
849
850         // Trash
851         err = v.Trash(TestHash)
852         if v.Writable() == false {
853                 if err != MethodDisabledError {
854                         t.Fatal(err)
855                 }
856         } else if err != nil {
857                 if err != ErrNotImplemented {
858                         t.Fatal(err)
859                 }
860         } else {
861                 _, err = v.Get(context.Background(), TestHash, buf)
862                 if err == nil || !os.IsNotExist(err) {
863                         t.Errorf("os.IsNotExist(%v) should have been true", err)
864                 }
865
866                 // Untrash
867                 err = v.Untrash(TestHash)
868                 if err != nil {
869                         t.Fatal(err)
870                 }
871         }
872
873         // Get the block - after trash and untrash sequence
874         n, err = v.Get(context.Background(), TestHash, buf)
875         if err != nil {
876                 t.Fatal(err)
877         }
878         if bytes.Compare(buf[:n], TestBlock) != 0 {
879                 t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
880         }
881 }
882
883 func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
884         v := factory(t)
885         defer v.Teardown()
886         defer func(orig arvados.Duration) {
887                 theConfig.TrashLifetime = orig
888         }(theConfig.TrashLifetime)
889
890         checkGet := func() error {
891                 buf := make([]byte, BlockSize)
892                 n, err := v.Get(context.Background(), TestHash, buf)
893                 if err != nil {
894                         return err
895                 }
896                 if bytes.Compare(buf[:n], TestBlock) != 0 {
897                         t.Fatalf("Got data %+q, expected %+q", buf[:n], TestBlock)
898                 }
899
900                 _, err = v.Mtime(TestHash)
901                 if err != nil {
902                         return err
903                 }
904
905                 err = v.Compare(context.Background(), TestHash, TestBlock)
906                 if err != nil {
907                         return err
908                 }
909
910                 indexBuf := new(bytes.Buffer)
911                 v.IndexTo("", indexBuf)
912                 if !strings.Contains(string(indexBuf.Bytes()), TestHash) {
913                         return os.ErrNotExist
914                 }
915
916                 return nil
917         }
918
919         // First set: EmptyTrash before reaching the trash deadline.
920
921         theConfig.TrashLifetime.Set("1h")
922
923         v.PutRaw(TestHash, TestBlock)
924         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
925
926         err := checkGet()
927         if err != nil {
928                 t.Fatal(err)
929         }
930
931         // Trash the block
932         err = v.Trash(TestHash)
933         if err == MethodDisabledError || err == ErrNotImplemented {
934                 // Skip the trash tests for read-only volumes, and
935                 // volume types that don't support TrashLifetime>0.
936                 return
937         }
938
939         err = checkGet()
940         if err == nil || !os.IsNotExist(err) {
941                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
942         }
943
944         err = v.Touch(TestHash)
945         if err == nil || !os.IsNotExist(err) {
946                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
947         }
948
949         v.EmptyTrash()
950
951         // Even after emptying the trash, we can untrash our block
952         // because the deadline hasn't been reached.
953         err = v.Untrash(TestHash)
954         if err != nil {
955                 t.Fatal(err)
956         }
957
958         err = checkGet()
959         if err != nil {
960                 t.Fatal(err)
961         }
962
963         err = v.Touch(TestHash)
964         if err != nil {
965                 t.Fatal(err)
966         }
967
968         // Because we Touch'ed, need to backdate again for next set of tests
969         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
970
971         // If the only block in the trash has already been untrashed,
972         // most volumes will fail a subsequent Untrash with a 404, but
973         // it's also acceptable for Untrash to succeed.
974         err = v.Untrash(TestHash)
975         if err != nil && !os.IsNotExist(err) {
976                 t.Fatalf("Expected success or os.IsNotExist(), but got: %v", err)
977         }
978
979         // The additional Untrash should not interfere with our
980         // already-untrashed copy.
981         err = checkGet()
982         if err != nil {
983                 t.Fatal(err)
984         }
985
986         // Untrash might have updated the timestamp, so backdate again
987         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
988
989         // Second set: EmptyTrash after the trash deadline has passed.
990
991         theConfig.TrashLifetime.Set("1ns")
992
993         err = v.Trash(TestHash)
994         if err != nil {
995                 t.Fatal(err)
996         }
997         err = checkGet()
998         if err == nil || !os.IsNotExist(err) {
999                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1000         }
1001
1002         // Even though 1ns has passed, we can untrash because we
1003         // haven't called EmptyTrash yet.
1004         err = v.Untrash(TestHash)
1005         if err != nil {
1006                 t.Fatal(err)
1007         }
1008         err = checkGet()
1009         if err != nil {
1010                 t.Fatal(err)
1011         }
1012
1013         // Trash it again, and this time call EmptyTrash so it really
1014         // goes away.
1015         // (In Azure volumes, un/trash changes Mtime, so first backdate again)
1016         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
1017         _ = v.Trash(TestHash)
1018         err = checkGet()
1019         if err == nil || !os.IsNotExist(err) {
1020                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1021         }
1022         v.EmptyTrash()
1023
1024         // Untrash won't find it
1025         err = v.Untrash(TestHash)
1026         if err == nil || !os.IsNotExist(err) {
1027                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1028         }
1029
1030         // Get block won't find it
1031         err = checkGet()
1032         if err == nil || !os.IsNotExist(err) {
1033                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1034         }
1035
1036         // Third set: If the same data block gets written again after
1037         // being trashed, and then the trash gets emptied, the newer
1038         // un-trashed copy doesn't get deleted along with it.
1039
1040         v.PutRaw(TestHash, TestBlock)
1041         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
1042
1043         theConfig.TrashLifetime.Set("1ns")
1044         err = v.Trash(TestHash)
1045         if err != nil {
1046                 t.Fatal(err)
1047         }
1048         err = checkGet()
1049         if err == nil || !os.IsNotExist(err) {
1050                 t.Fatalf("os.IsNotExist(%v) should have been true", err)
1051         }
1052
1053         v.PutRaw(TestHash, TestBlock)
1054         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
1055
1056         // EmptyTrash should not delete the untrashed copy.
1057         v.EmptyTrash()
1058         err = checkGet()
1059         if err != nil {
1060                 t.Fatal(err)
1061         }
1062
1063         // Fourth set: If the same data block gets trashed twice with
1064         // different deadlines A and C, and then the trash is emptied
1065         // at intermediate time B (A < B < C), it is still possible to
1066         // untrash the block whose deadline is "C".
1067
1068         v.PutRaw(TestHash, TestBlock)
1069         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
1070
1071         theConfig.TrashLifetime.Set("1ns")
1072         err = v.Trash(TestHash)
1073         if err != nil {
1074                 t.Fatal(err)
1075         }
1076
1077         v.PutRaw(TestHash, TestBlock)
1078         v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
1079
1080         theConfig.TrashLifetime.Set("1h")
1081         err = v.Trash(TestHash)
1082         if err != nil {
1083                 t.Fatal(err)
1084         }
1085
1086         // EmptyTrash should not prevent us from recovering the
1087         // time.Hour ("C") trash
1088         v.EmptyTrash()
1089         err = v.Untrash(TestHash)
1090         if err != nil {
1091                 t.Fatal(err)
1092         }
1093         err = checkGet()
1094         if err != nil {
1095                 t.Fatal(err)
1096         }
1097 }