-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepstore
import (
"bytes"
+ "context"
+ "crypto/md5"
+ "errors"
"fmt"
- "io/ioutil"
+ "io"
+ "net/http"
"os"
- "path"
- "regexp"
"sort"
"strings"
+ "sync"
"testing"
-
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "time"
+
+ "git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/prometheus/client_golang/prometheus"
+ . "gopkg.in/check.v1"
)
-var TestBlock = []byte("The quick brown fox jumps over the lazy dog.")
-var TestHash = "e4d909c290d0fb1ca068ffaddf22cbd0"
-var TestHashPutResp = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"
-
-var TestBlock2 = []byte("Pack my box with five dozen liquor jugs.")
-var TestHash2 = "f15ac516f788aec4f30932ffb6395c39"
-
-var TestBlock3 = []byte("Now is the time for all good men to come to the aid of their country.")
-var TestHash3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
-
-// BadBlock is used to test collisions and corruption.
-// It must not match any test hashes.
-var BadBlock = []byte("The magic words are squeamish ossifrage.")
-
-// Empty block
-var EmptyHash = "d41d8cd98f00b204e9800998ecf8427e"
-var EmptyBlock = []byte("")
+func TestGocheck(t *testing.T) {
+ TestingT(t)
+}
-// TODO(twp): Tests still to be written
-//
-// * TestPutBlockFull
-// - test that PutBlock returns 503 Full if the filesystem is full.
-// (must mock FreeDiskSpace or Statfs? use a tmpfs?)
-//
-// * TestPutBlockWriteErr
-// - test the behavior when Write returns an error.
-// - Possible solutions: use a small tmpfs and a high
-// MIN_FREE_KILOBYTES to trick PutBlock into attempting
-// to write a block larger than the amount of space left
-// - use an interface to mock ioutil.TempFile with a File
-// object that always returns an error on write
-//
-// ========================================
-// GetBlock tests.
-// ========================================
+const (
+ fooHash = "acbd18db4cc2f85cedef654fccc4a4d8"
+ barHash = "37b51d194a7513e45b56f6524f2d51f2"
+)
-// TestGetBlock
-// Test that simple block reads succeed.
-//
-func TestGetBlock(t *testing.T) {
- defer teardown()
+var testServiceURL = func() arvados.URL {
+ return arvados.URL{Host: "localhost:12345", Scheme: "http"}
+}()
- // Prepare two test Keep volumes. Our block is stored on the second volume.
- KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Close()
+func authContext(token string) context.Context {
+ return auth.NewContext(context.TODO(), &auth.Credentials{Tokens: []string{token}})
+}
- vols := KeepVM.AllReadable()
- if err := vols[1].Put(TestHash, TestBlock); err != nil {
- t.Error(err)
+func testCluster(t TB) *arvados.Cluster {
+ cfg, err := config.NewLoader(bytes.NewBufferString("Clusters: {zzzzz: {}}"), ctxlog.TestLogger(t)).Load()
+ if err != nil {
+ t.Fatal(err)
}
-
- // Check that GetBlock returns success.
- result, err := GetBlock(TestHash)
+ cluster, err := cfg.GetCluster("")
if err != nil {
- t.Errorf("GetBlock error: %s", err)
+ t.Fatal(err)
}
- if fmt.Sprint(result) != fmt.Sprint(TestBlock) {
- t.Errorf("expected %s, got %s", TestBlock, result)
+ cluster.SystemRootToken = arvadostest.SystemRootToken
+ cluster.ManagementToken = arvadostest.ManagementToken
+ return cluster
+}
+
+func testKeepstore(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*keepstore, context.CancelFunc) {
+ if reg == nil {
+ reg = prometheus.NewRegistry()
}
+ ctx, cancel := context.WithCancel(context.Background())
+ ctx = ctxlog.Context(ctx, ctxlog.TestLogger(t))
+ ks, err := newKeepstore(ctx, cluster, cluster.SystemRootToken, reg, testServiceURL)
+ if err != nil {
+ t.Fatal(err)
+ }
+ return ks, cancel
}
-// TestGetBlockMissing
-// GetBlock must return an error when the block is not found.
-//
-func TestGetBlockMissing(t *testing.T) {
- defer teardown()
+var _ = Suite(&keepstoreSuite{})
- // Create two empty test Keep volumes.
- KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Close()
+type keepstoreSuite struct {
+ cluster *arvados.Cluster
+}
- // Check that GetBlock returns failure.
- result, err := GetBlock(TestHash)
- if err != NotFoundError {
- t.Errorf("Expected NotFoundError, got %v", result)
+func (s *keepstoreSuite) SetUpTest(c *C) {
+ s.cluster = testCluster(c)
+ s.cluster.Volumes = map[string]arvados.Volume{
+ "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
+ "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
}
}
-// TestGetBlockCorrupt
-// GetBlock must return an error when a corrupted block is requested
-// (the contents of the file do not checksum to its hash).
-//
-func TestGetBlockCorrupt(t *testing.T) {
- defer teardown()
+func (s *keepstoreSuite) TestBlockRead_ChecksumMismatch(c *C) {
+ ks, cancel := testKeepstore(c, s.cluster, nil)
+ defer cancel()
+
+ ctx := authContext(arvadostest.ActiveTokenV2)
+
+ fooHash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ err := ks.mountsW[0].BlockWrite(ctx, fooHash, []byte("bar"))
+ c.Assert(err, IsNil)
+
+ _, err = ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+ Hash: fooHash,
+ Data: []byte("foo"),
+ })
+ c.Check(err, ErrorMatches, "hash collision")
+
+ buf := bytes.NewBuffer(nil)
+ _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
+ Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
+ WriteTo: buf,
+ })
+ c.Check(err, ErrorMatches, "checksum mismatch in stored data")
+ c.Check(buf.String(), Not(Equals), "foo")
+ c.Check(buf.Len() < 3, Equals, true)
+
+ err = ks.mountsW[1].BlockWrite(ctx, fooHash, []byte("foo"))
+ c.Assert(err, IsNil)
+
+ buf = bytes.NewBuffer(nil)
+ _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
+ Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
+ WriteTo: buf,
+ })
+ c.Check(err, ErrorMatches, "checksum mismatch in stored data")
+ c.Check(buf.Len() < 3, Equals, true)
+}
- // Create two test Keep volumes and store a corrupt block in one.
- KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Close()
+func (s *keepstoreSuite) TestBlockReadWrite_SigningDisabled(c *C) {
+ origKey := s.cluster.Collections.BlobSigningKey
+ s.cluster.Collections.BlobSigning = false
+ s.cluster.Collections.BlobSigningKey = ""
+ ks, cancel := testKeepstore(c, s.cluster, nil)
+ defer cancel()
+
+ resp, err := ks.BlockWrite(authContext("abcde"), arvados.BlockWriteOptions{
+ Hash: fooHash,
+ Data: []byte("foo"),
+ })
+ c.Assert(err, IsNil)
+ c.Check(resp.Locator, Equals, fooHash+"+3")
+ locUnsigned := resp.Locator
+ ttl := time.Hour
+ locSigned := arvados.SignLocator(locUnsigned, arvadostest.ActiveTokenV2, time.Now().Add(ttl), ttl, []byte(origKey))
+ c.Assert(locSigned, Not(Equals), locUnsigned)
+
+ for _, locator := range []string{locUnsigned, locSigned} {
+ for _, token := range []string{"", "xyzzy", arvadostest.ActiveTokenV2} {
+ c.Logf("=== locator %q token %q", locator, token)
+ ctx := authContext(token)
+ buf := bytes.NewBuffer(nil)
+ _, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
+ Locator: locator,
+ WriteTo: buf,
+ })
+ c.Check(err, IsNil)
+ c.Check(buf.String(), Equals, "foo")
+ }
+ }
+}
- vols := KeepVM.AllReadable()
- vols[0].Put(TestHash, BadBlock)
+func (s *keepstoreSuite) TestBlockRead_OrderedByStorageClassPriority(c *C) {
+ s.cluster.Volumes = map[string]arvados.Volume{
+ "zzzzz-nyw5e-111111111111111": {
+ Driver: "stub",
+ Replication: 1,
+ StorageClasses: map[string]bool{"class1": true}},
+ "zzzzz-nyw5e-222222222222222": {
+ Driver: "stub",
+ Replication: 1,
+ StorageClasses: map[string]bool{"class2": true, "class3": true}},
+ }
+
+ // "foobar" is just some data that happens to result in
+ // rendezvous order {111, 222}
+ data := []byte("foobar")
+ hash := fmt.Sprintf("%x", md5.Sum(data))
+
+ for _, trial := range []struct {
+ priority1 int // priority of class1, thus vol1
+ priority2 int // priority of class2
+ priority3 int // priority of class3 (vol2 priority will be max(priority2, priority3))
+ expectLog string
+ }{
+ {100, 50, 50, "111 read 385\n"}, // class1 has higher priority => try vol1 first, no need to try vol2
+ {100, 100, 100, "111 read 385\n"}, // same priority, vol2 is first in rendezvous order => try vol1 first and succeed
+ {66, 99, 33, "222 read 385\n111 read 385\n"}, // class2 has higher priority => try vol2 first, then try vol1
+ {66, 33, 99, "222 read 385\n111 read 385\n"}, // class3 has highest priority => vol2 has highest => try vol2 first, then try vol1
+ } {
+ c.Logf("=== %+v", trial)
+
+ s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
+ "class1": {Priority: trial.priority1},
+ "class2": {Priority: trial.priority2},
+ "class3": {Priority: trial.priority3},
+ }
+ ks, cancel := testKeepstore(c, s.cluster, nil)
+ defer cancel()
+
+ ctx := authContext(arvadostest.ActiveTokenV2)
+ resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+ Hash: hash,
+ Data: data,
+ StorageClasses: []string{"class1"},
+ })
+ c.Assert(err, IsNil)
+
+ // Combine logs into one. (We only want the logs from
+ // the BlockRead below, not from BlockWrite above.)
+ stubLog := &stubLog{}
+ for _, mnt := range ks.mounts {
+ mnt.volume.(*stubVolume).stubLog = stubLog
+ }
- // Check that GetBlock returns failure.
- result, err := GetBlock(TestHash)
- if err != DiskHashError {
- t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
+ n, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
+ Locator: resp.Locator,
+ WriteTo: io.Discard,
+ })
+ c.Assert(n, Equals, len(data))
+ c.Assert(err, IsNil)
+ c.Check(stubLog.String(), Equals, trial.expectLog)
}
}
-// ========================================
-// PutBlock tests
-// ========================================
+func (s *keepstoreSuite) TestBlockWrite_NoWritableVolumes(c *C) {
+ for uuid, v := range s.cluster.Volumes {
+ v.ReadOnly = true
+ s.cluster.Volumes[uuid] = v
+ }
+ ks, cancel := testKeepstore(c, s.cluster, nil)
+ defer cancel()
+ for _, mnt := range ks.mounts {
+ mnt.volume.(*stubVolume).blockWrite = func(context.Context, string, []byte) error {
+ c.Error("volume BlockWrite called")
+ return errors.New("fail")
+ }
+ }
+ ctx := authContext(arvadostest.ActiveTokenV2)
-// TestPutBlockOK
-// PutBlock can perform a simple block write and returns success.
-//
-func TestPutBlockOK(t *testing.T) {
- defer teardown()
+ _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+ Hash: fooHash,
+ Data: []byte("foo")})
+ c.Check(err, NotNil)
+ c.Check(err.(interface{ HTTPStatus() int }).HTTPStatus(), Equals, http.StatusInsufficientStorage)
+}
- // Create two test Keep volumes.
- KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Close()
+func (s *keepstoreSuite) TestBlockWrite_MultipleStorageClasses(c *C) {
+ s.cluster.Volumes = map[string]arvados.Volume{
+ "zzzzz-nyw5e-111111111111111": {
+ Driver: "stub",
+ Replication: 1,
+ StorageClasses: map[string]bool{"class1": true}},
+ "zzzzz-nyw5e-121212121212121": {
+ Driver: "stub",
+ Replication: 1,
+ StorageClasses: map[string]bool{"class1": true, "class2": true}},
+ "zzzzz-nyw5e-222222222222222": {
+ Driver: "stub",
+ Replication: 1,
+ StorageClasses: map[string]bool{"class2": true}},
+ }
+
+ // testData is a block that happens to have rendezvous order 111, 121, 222
+ testData := []byte("qux")
+ testHash := fmt.Sprintf("%x+%d", md5.Sum(testData), len(testData))
+
+ s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
+ "class1": {},
+ "class2": {},
+ "class3": {},
+ }
+
+ ctx := authContext(arvadostest.ActiveTokenV2)
+ for idx, trial := range []struct {
+ classes string // desired classes
+ expectLog string
+ }{
+ {"class1", "" +
+ "111 read d85\n" +
+ "121 read d85\n" +
+ "111 write d85\n" +
+ "111 read d85\n" +
+ "111 touch d85\n"},
+ {"class2", "" +
+ "121 read d85\n" + // write#1
+ "222 read d85\n" +
+ "121 write d85\n" +
+ "121 read d85\n" + // write#2
+ "121 touch d85\n"},
+ {"class1,class2", "" +
+ "111 read d85\n" + // write#1
+ "121 read d85\n" +
+ "222 read d85\n" +
+ "121 write d85\n" +
+ "111 write d85\n" +
+ "111 read d85\n" + // write#2
+ "111 touch d85\n" +
+ "121 read d85\n" +
+ "121 touch d85\n"},
+ {"class1,class2,class404", "" +
+ "111 read d85\n" + // write#1
+ "121 read d85\n" +
+ "222 read d85\n" +
+ "121 write d85\n" +
+ "111 write d85\n" +
+ "111 read d85\n" + // write#2
+ "111 touch d85\n" +
+ "121 read d85\n" +
+ "121 touch d85\n"},
+ } {
+ c.Logf("=== %d: %+v", idx, trial)
+
+ ks, cancel := testKeepstore(c, s.cluster, nil)
+ defer cancel()
+ stubLog := &stubLog{}
+ for _, mnt := range ks.mounts {
+ mnt.volume.(*stubVolume).stubLog = stubLog
+ }
- // Check that PutBlock stores the data as expected.
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
- t.Fatalf("PutBlock: n %d err %v", n, err)
+ // Check that we chose the right block data
+ rvz := ks.rendezvous(testHash, ks.mountsW)
+ c.Assert(rvz[0].UUID[24:], Equals, "111")
+ c.Assert(rvz[1].UUID[24:], Equals, "121")
+ c.Assert(rvz[2].UUID[24:], Equals, "222")
+
+ for i := 0; i < 2; i++ {
+ _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+ Hash: testHash,
+ Data: testData,
+ StorageClasses: strings.Split(trial.classes, ","),
+ })
+ c.Check(err, IsNil)
+ }
+ c.Check(stubLog.String(), Equals, trial.expectLog)
}
+}
- vols := KeepVM.AllReadable()
- result, err := vols[1].Get(TestHash)
- if err != nil {
- t.Fatalf("Volume #0 Get returned error: %v", err)
+func (s *keepstoreSuite) TestBlockTrash(c *C) {
+ s.cluster.Volumes = map[string]arvados.Volume{
+ "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
+ "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
+ "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true},
+ "zzzzz-nyw5e-333333333333333": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
+ }
+ ks, cancel := testKeepstore(c, s.cluster, nil)
+ defer cancel()
+
+ var vol []*stubVolume
+ for _, mount := range ks.mountsR {
+ vol = append(vol, mount.volume.(*stubVolume))
+ }
+ sort.Slice(vol, func(i, j int) bool {
+ return vol[i].params.UUID < vol[j].params.UUID
+ })
+
+ ctx := context.Background()
+ loc := fooHash + "+3"
+ tOld := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Second)
+
+ clear := func() {
+ for _, vol := range vol {
+ err := vol.BlockTrash(fooHash)
+ if !os.IsNotExist(err) {
+ c.Assert(err, IsNil)
+ }
+ }
}
- if string(result) != string(TestBlock) {
- t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
- string(TestBlock), string(result))
+ writeit := func(volidx int) {
+ err := vol[volidx].BlockWrite(ctx, fooHash, []byte("foo"))
+ c.Assert(err, IsNil)
+ err = vol[volidx].blockTouchWithTime(fooHash, tOld)
+ c.Assert(err, IsNil)
}
+ trashit := func() error {
+ return ks.BlockTrash(ctx, loc)
+ }
+ checkexists := func(volidx int) bool {
+ err := vol[volidx].BlockRead(ctx, fooHash, brdiscard)
+ if !os.IsNotExist(err) {
+ c.Check(err, IsNil)
+ }
+ return err == nil
+ }
+
+ clear()
+ c.Check(trashit(), Equals, os.ErrNotExist)
+
+ // one old replica => trash it
+ clear()
+ writeit(0)
+ c.Check(trashit(), IsNil)
+ c.Check(checkexists(0), Equals, false)
+
+ // one old replica + one new replica => keep new, trash old
+ clear()
+ writeit(0)
+ writeit(1)
+ c.Check(vol[1].blockTouchWithTime(fooHash, time.Now()), IsNil)
+ c.Check(trashit(), IsNil)
+ c.Check(checkexists(0), Equals, false)
+ c.Check(checkexists(1), Equals, true)
+
+ // two old replicas => trash both
+ clear()
+ writeit(0)
+ writeit(1)
+ c.Check(trashit(), IsNil)
+ c.Check(checkexists(0), Equals, false)
+ c.Check(checkexists(1), Equals, false)
+
+ // four old replicas => trash all except readonly volume with
+ // AllowTrashWhenReadOnly==false
+ clear()
+ writeit(0)
+ writeit(1)
+ writeit(2)
+ writeit(3)
+ c.Check(trashit(), IsNil)
+ c.Check(checkexists(0), Equals, false)
+ c.Check(checkexists(1), Equals, false)
+ c.Check(checkexists(2), Equals, true)
+ c.Check(checkexists(3), Equals, false)
+
+ // two old replicas but one returns an error => return the
+ // only non-404 backend error
+ clear()
+ vol[0].blockTrash = func(hash string) error {
+ return errors.New("fake error")
+ }
+ writeit(0)
+ writeit(3)
+ c.Check(trashit(), ErrorMatches, "fake error")
+ c.Check(checkexists(0), Equals, true)
+ c.Check(checkexists(1), Equals, false)
+ c.Check(checkexists(2), Equals, false)
+ c.Check(checkexists(3), Equals, false)
}
-// TestPutBlockOneVol
-// PutBlock still returns success even when only one of the known
-// volumes is online.
-//
-func TestPutBlockOneVol(t *testing.T) {
- defer teardown()
-
- // Create two test Keep volumes, but cripple one of them.
- KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Close()
-
- vols := KeepVM.AllWritable()
- vols[0].(*MockVolume).Bad = true
-
- // Check that PutBlock stores the data as expected.
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
- t.Fatalf("PutBlock: n %d err %v", n, err)
+func (s *keepstoreSuite) TestBlockWrite_OnlyOneBuffer(c *C) {
+ s.cluster.API.MaxKeepBlobBuffers = 1
+ ks, cancel := testKeepstore(c, s.cluster, nil)
+ defer cancel()
+ ok := make(chan struct{})
+ go func() {
+ defer close(ok)
+ ctx := authContext(arvadostest.ActiveTokenV2)
+ _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+ Hash: fooHash,
+ Data: []byte("foo")})
+ c.Check(err, IsNil)
+ }()
+ select {
+ case <-ok:
+ case <-time.After(time.Second):
+ c.Fatal("PUT deadlocks with MaxKeepBlobBuffers==1")
}
+}
- result, err := GetBlock(TestHash)
- if err != nil {
- t.Fatalf("GetBlock: %v", err)
- }
- if string(result) != string(TestBlock) {
- t.Error("PutBlock/GetBlock mismatch")
- t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
- string(TestBlock), string(result))
+func (s *keepstoreSuite) TestBufferPoolLeak(c *C) {
+ s.cluster.API.MaxKeepBlobBuffers = 4
+ ks, cancel := testKeepstore(c, s.cluster, nil)
+ defer cancel()
+
+ ctx := authContext(arvadostest.ActiveTokenV2)
+ var wg sync.WaitGroup
+ for range make([]int, 20) {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+ Hash: fooHash,
+ Data: []byte("foo")})
+ c.Check(err, IsNil)
+ _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
+ Locator: resp.Locator,
+ WriteTo: io.Discard})
+ c.Check(err, IsNil)
+ }()
+ }
+ ok := make(chan struct{})
+ go func() {
+ wg.Wait()
+ close(ok)
+ }()
+ select {
+ case <-ok:
+ case <-time.After(time.Second):
+ c.Fatal("read/write sequence deadlocks, likely buffer pool leak")
}
}
-// TestPutBlockMD5Fail
-// Check that PutBlock returns an error if passed a block and hash that
-// do not match.
-//
-func TestPutBlockMD5Fail(t *testing.T) {
- defer teardown()
+func (s *keepstoreSuite) TestPutStorageClasses(c *C) {
+ s.cluster.Volumes = map[string]arvados.Volume{
+ "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, // "default" is implicit
+ "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"special": true, "extra": true}},
+ "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true},
+ }
+ ks, cancel := testKeepstore(c, s.cluster, nil)
+ defer cancel()
+ ctx := authContext(arvadostest.ActiveTokenV2)
+
+ for _, trial := range []struct {
+ ask []string
+ expectReplicas int
+ expectClasses map[string]int
+ }{
+ {nil,
+ 1,
+ map[string]int{"default": 1}},
+ {[]string{},
+ 1,
+ map[string]int{"default": 1}},
+ {[]string{"default"},
+ 1,
+ map[string]int{"default": 1}},
+ {[]string{"default", "default"},
+ 1,
+ map[string]int{"default": 1}},
+ {[]string{"special"},
+ 1,
+ map[string]int{"extra": 1, "special": 1}},
+ {[]string{"special", "readonly"},
+ 1,
+ map[string]int{"extra": 1, "special": 1}},
+ {[]string{"special", "nonexistent"},
+ 1,
+ map[string]int{"extra": 1, "special": 1}},
+ {[]string{"extra", "special"},
+ 1,
+ map[string]int{"extra": 1, "special": 1}},
+ {[]string{"default", "special"},
+ 2,
+ map[string]int{"default": 1, "extra": 1, "special": 1}},
+ } {
+ c.Logf("success case %#v", trial)
+ resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+ Hash: fooHash,
+ Data: []byte("foo"),
+ StorageClasses: trial.ask,
+ })
+ if !c.Check(err, IsNil) {
+ continue
+ }
+ c.Check(resp.Replicas, Equals, trial.expectReplicas)
+ if len(trial.expectClasses) == 0 {
+ // any non-empty value is correct
+ c.Check(resp.StorageClasses, Not(HasLen), 0)
+ } else {
+ c.Check(resp.StorageClasses, DeepEquals, trial.expectClasses)
+ }
+ }
- // Create two test Keep volumes.
- KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Close()
+ for _, ask := range [][]string{
+ {"doesnotexist"},
+ {"doesnotexist", "readonly"},
+ {"readonly"},
+ } {
+ c.Logf("failure case %s", ask)
+ _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+ Hash: fooHash,
+ Data: []byte("foo"),
+ StorageClasses: ask,
+ })
+ c.Check(err, NotNil)
+ }
+}
- // Check that PutBlock returns the expected error when the hash does
- // not match the block.
- if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError {
- t.Errorf("Expected RequestHashError, got %v", err)
+func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
+ for uuid, v := range s.cluster.Volumes {
+ v.ReadOnly = true
+ s.cluster.Volumes[uuid] = v
}
+ ks, cancel := testKeepstore(c, s.cluster, nil)
+ defer cancel()
- // Confirm that GetBlock fails to return anything.
- if result, err := GetBlock(TestHash); err != NotFoundError {
- t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
- string(result), err)
+ for _, mnt := range ks.mounts {
+ err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo"))
+ c.Assert(err, IsNil)
+ err = mnt.BlockRead(context.Background(), fooHash, brdiscard)
+ c.Assert(err, IsNil)
}
-}
-// TestPutBlockCorrupt
-// PutBlock should overwrite corrupt blocks on disk when given
-// a PUT request with a good block.
-//
-func TestPutBlockCorrupt(t *testing.T) {
- defer teardown()
+ err := ks.BlockUntrash(context.Background(), fooHash)
+ c.Check(os.IsNotExist(err), Equals, true)
- // Create two test Keep volumes.
- KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Close()
+ for _, mnt := range ks.mounts {
+ err := mnt.BlockRead(context.Background(), fooHash, brdiscard)
+ c.Assert(err, IsNil)
+ }
+}
- // Store a corrupted block under TestHash.
- vols := KeepVM.AllWritable()
- vols[0].Put(TestHash, BadBlock)
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
- t.Errorf("PutBlock: n %d err %v", n, err)
+func (s *keepstoreSuite) TestBlockWrite_SkipReadOnly(c *C) {
+ s.cluster.Volumes = map[string]arvados.Volume{
+ "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
+ "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", ReadOnly: true},
+ "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
}
+ ks, cancel := testKeepstore(c, s.cluster, nil)
+ defer cancel()
+ ctx := authContext(arvadostest.ActiveTokenV2)
- // The block on disk should now match TestBlock.
- if block, err := GetBlock(TestHash); err != nil {
- t.Errorf("GetBlock: %v", err)
- } else if bytes.Compare(block, TestBlock) != 0 {
- t.Errorf("GetBlock returned: '%s'", string(block))
+ for i := range make([]byte, 32) {
+ data := []byte(fmt.Sprintf("block %d", i))
+ _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{Data: data})
+ c.Assert(err, IsNil)
}
+ c.Check(ks.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume).stubLog.String(), Matches, "(?ms).*write.*")
+ c.Check(ks.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
+ c.Check(ks.mounts["zzzzz-nyw5e-222222222222222"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
}
-// TestPutBlockCollision
-// PutBlock returns a 400 Collision error when attempting to
-// store a block that collides with another block on disk.
-//
-func TestPutBlockCollision(t *testing.T) {
- defer teardown()
+func (s *keepstoreSuite) TestGetLocatorInfo(c *C) {
+ for _, trial := range []struct {
+ locator string
+ ok bool
+ expect locatorInfo
+ }{
+ {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+ ok: true},
+ {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
+ ok: true, expect: locatorInfo{size: 1234}},
+ {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Abcdef@abcdef",
+ ok: true, expect: locatorInfo{size: 1234, signed: true}},
+ {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Rzzzzz-abcdef",
+ ok: true, expect: locatorInfo{size: 1234, remote: true}},
+ {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+12345+Zexample+Rzzzzz-abcdef",
+ ok: true, expect: locatorInfo{size: 12345, remote: true}},
+ {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+123456+👶🦈+Rzzzzz-abcdef",
+ ok: true, expect: locatorInfo{size: 123456, remote: true}},
+ // invalid: bad hash char
+ {locator: "aaaaaaaaaaaaaazaaaaaaaaaaaaaaaaa+1234",
+ ok: false},
+ {locator: "aaaaaaaaaaaaaaFaaaaaaaaaaaaaaaaa+1234",
+ ok: false},
+ {locator: "aaaaaaaaaaaaaa⛵aaaaaaaaaaaaaaaaa+1234",
+ ok: false},
+ // invalid: hash length != 32
+ {locator: "",
+ ok: false},
+ {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+ ok: false},
+ {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
+ ok: false},
+ {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb",
+ ok: false},
+ {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb+1234",
+ ok: false},
+ // invalid: first hint is not size
+ {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+Abcdef+1234",
+ ok: false},
+ // invalid: leading/trailing/double +
+ {locator: "+aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
+ ok: false},
+ {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+",
+ ok: false},
+ {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa++1234",
+ ok: false},
+ {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234++Abcdef@abcdef",
+ ok: false},
+ } {
+ c.Logf("=== %s", trial.locator)
+ li, err := getLocatorInfo(trial.locator)
+ if !trial.ok {
+ c.Check(err, NotNil)
+ continue
+ }
+ c.Check(err, IsNil)
+ c.Check(li.hash, Equals, trial.locator[:32])
+ c.Check(li.size, Equals, trial.expect.size)
+ c.Check(li.signed, Equals, trial.expect.signed)
+ c.Check(li.remote, Equals, trial.expect.remote)
+ }
+}
- // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
- b1 := arvadostest.MD5CollisionData[0]
- b2 := arvadostest.MD5CollisionData[1]
- locator := arvadostest.MD5CollisionMD5
+func init() {
+ driver["stub"] = func(params newVolumeParams) (volume, error) {
+ v := &stubVolume{
+ params: params,
+ data: make(map[string]stubData),
+ stubLog: &stubLog{},
+ }
+ return v, nil
+ }
+}
- // Prepare two test Keep volumes.
- KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Close()
+type stubLog struct {
+ sync.Mutex
+ bytes.Buffer
+}
- // Store one block, then attempt to store the other. Confirm that
- // PutBlock reported a CollisionError.
- if _, err := PutBlock(b1, locator); err != nil {
- t.Error(err)
- }
- if _, err := PutBlock(b2, locator); err == nil {
- t.Error("PutBlock did not report a collision")
- } else if err != CollisionError {
- t.Errorf("PutBlock returned %v", err)
+func (sl *stubLog) Printf(format string, args ...interface{}) {
+ if sl == nil {
+ return
}
+ sl.Lock()
+ defer sl.Unlock()
+ fmt.Fprintf(sl, format+"\n", args...)
}
-// TestPutBlockTouchFails
-// When PutBlock is asked to PUT an existing block, but cannot
-// modify the timestamp, it should write a second block.
-//
-func TestPutBlockTouchFails(t *testing.T) {
- defer teardown()
-
- // Prepare two test Keep volumes.
- KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Close()
- vols := KeepVM.AllWritable()
-
- // Store a block and then make the underlying volume bad,
- // so a subsequent attempt to update the file timestamp
- // will fail.
- vols[0].Put(TestHash, BadBlock)
- oldMtime, err := vols[0].Mtime(TestHash)
- if err != nil {
- t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
- }
+type stubData struct {
+ mtime time.Time
+ data []byte
+ trash time.Time
+}
- // vols[0].Touch will fail on the next call, so the volume
- // manager will store a copy on vols[1] instead.
- vols[0].(*MockVolume).Touchable = false
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
- t.Fatalf("PutBlock: n %d err %v", n, err)
- }
- vols[0].(*MockVolume).Touchable = true
+type stubVolume struct {
+ params newVolumeParams
+ data map[string]stubData
+ stubLog *stubLog
+ mtx sync.Mutex
+
+ // The following funcs enable tests to insert delays and
+ // failures. Each volume operation begins by calling the
+ // corresponding func (if non-nil). If the func returns an
+ // error, that error is returned to caller. Otherwise, the
+ // stub continues normally.
+ blockRead func(ctx context.Context, hash string, writeTo io.WriterAt) error
+ blockWrite func(ctx context.Context, hash string, data []byte) error
+ deviceID func() string
+ blockTouch func(hash string) error
+ blockTrash func(hash string) error
+ blockUntrash func(hash string) error
+ index func(ctx context.Context, prefix string, writeTo io.Writer) error
+ mtime func(hash string) (time.Time, error)
+ emptyTrash func()
+}
- // Now the mtime on the block on vols[0] should be unchanged, and
- // there should be a copy of the block on vols[1].
- newMtime, err := vols[0].Mtime(TestHash)
- if err != nil {
- t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
+func (v *stubVolume) log(op, hash string) {
+ // Note this intentionally crashes if UUID or hash is short --
+ // if keepstore ever does that, tests should fail.
+ v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3])
+}
+
+func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error {
+ v.log("read", hash)
+ if v.blockRead != nil {
+ err := v.blockRead(ctx, hash, writeTo)
+ if err != nil {
+ return err
+ }
}
- if !newMtime.Equal(oldMtime) {
- t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n",
- oldMtime, newMtime)
+ v.mtx.Lock()
+ ent, ok := v.data[hash]
+ v.mtx.Unlock()
+ if !ok || !ent.trash.IsZero() {
+ return os.ErrNotExist
+ }
+ wrote := 0
+ for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 {
+ data := ent.data[wrote:]
+ if len(data) > writesize {
+ data = data[:writesize]
+ }
+ n, err := writeTo.WriteAt(data, int64(wrote))
+ wrote += n
+ if err != nil {
+ return err
+ }
}
- result, err := vols[1].Get(TestHash)
- if err != nil {
- t.Fatalf("vols[1]: %v", err)
+ return nil
+}
+
+func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+ v.log("write", hash)
+ if v.blockWrite != nil {
+ if err := v.blockWrite(ctx, hash, data); err != nil {
+ return err
+ }
}
- if bytes.Compare(result, TestBlock) != 0 {
- t.Errorf("new block does not match test block\nnew block = %v\n", result)
+ v.mtx.Lock()
+ defer v.mtx.Unlock()
+ v.data[hash] = stubData{
+ mtime: time.Now(),
+ data: append([]byte(nil), data...),
}
+ return nil
}
-func TestDiscoverTmpfs(t *testing.T) {
- var tempVols [4]string
- var err error
+func (v *stubVolume) DeviceID() string {
+ return fmt.Sprintf("%p", v)
+}
- // Create some directories suitable for using as keep volumes.
- for i := range tempVols {
- if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil {
- t.Fatal(err)
- }
- defer os.RemoveAll(tempVols[i])
- tempVols[i] = tempVols[i] + "/keep"
- if err = os.Mkdir(tempVols[i], 0755); err != nil {
- t.Fatal(err)
+func (v *stubVolume) BlockTouch(hash string) error {
+ v.log("touch", hash)
+ if v.blockTouch != nil {
+ if err := v.blockTouch(hash); err != nil {
+ return err
}
}
-
- // Set up a bogus ProcMounts file.
- f, err := ioutil.TempFile("", "keeptest")
- if err != nil {
- t.Fatal(err)
- }
- defer os.Remove(f.Name())
- for i, vol := range tempVols {
- // Add readonly mount points at odd indexes.
- var opts string
- switch i % 2 {
- case 0:
- opts = "rw,nosuid,nodev,noexec"
- case 1:
- opts = "nosuid,nodev,noexec,ro"
- }
- fmt.Fprintf(f, "tmpfs %s tmpfs %s 0 0\n", path.Dir(vol), opts)
+ v.mtx.Lock()
+ defer v.mtx.Unlock()
+ ent, ok := v.data[hash]
+ if !ok || !ent.trash.IsZero() {
+ return os.ErrNotExist
}
- f.Close()
- ProcMounts = f.Name()
+ ent.mtime = time.Now()
+ v.data[hash] = ent
+ return nil
+}
- trashLifetime = 24 * 60 * 60
- resultVols := volumeSet{}
- added := (&unixVolumeAdder{&resultVols}).Discover()
+// Set mtime to the (presumably old) specified time.
+func (v *stubVolume) blockTouchWithTime(hash string, t time.Time) error {
+ v.log("touchwithtime", hash)
+ v.mtx.Lock()
+ defer v.mtx.Unlock()
+ ent, ok := v.data[hash]
+ if !ok {
+ return os.ErrNotExist
+ }
+ ent.mtime = t
+ v.data[hash] = ent
+ return nil
+}
- if added != len(resultVols) {
- t.Errorf("Discover returned %d, but added %d volumes",
- added, len(resultVols))
+func (v *stubVolume) BlockTrash(hash string) error {
+ v.log("trash", hash)
+ if v.blockTrash != nil {
+ if err := v.blockTrash(hash); err != nil {
+ return err
+ }
}
- if added != len(tempVols) {
- t.Errorf("Discover returned %d but we set up %d volumes",
- added, len(tempVols))
+ v.mtx.Lock()
+ defer v.mtx.Unlock()
+ ent, ok := v.data[hash]
+ if !ok || !ent.trash.IsZero() {
+ return os.ErrNotExist
}
- for i, tmpdir := range tempVols {
- if tmpdir != resultVols[i].(*UnixVolume).root {
- t.Errorf("Discover returned %s, expected %s\n",
- resultVols[i].(*UnixVolume).root, tmpdir)
- }
- if expectReadonly := i%2 == 1; expectReadonly != resultVols[i].(*UnixVolume).readonly {
- t.Errorf("Discover added %s with readonly=%v, should be %v",
- tmpdir, !expectReadonly, expectReadonly)
+ ent.trash = time.Now().Add(v.params.Cluster.Collections.BlobTrashLifetime.Duration())
+ v.data[hash] = ent
+ return nil
+}
+
+func (v *stubVolume) BlockUntrash(hash string) error {
+ v.log("untrash", hash)
+ if v.blockUntrash != nil {
+ if err := v.blockUntrash(hash); err != nil {
+ return err
}
}
+ v.mtx.Lock()
+ defer v.mtx.Unlock()
+ ent, ok := v.data[hash]
+ if !ok || ent.trash.IsZero() {
+ return os.ErrNotExist
+ }
+ ent.trash = time.Time{}
+ v.data[hash] = ent
+ return nil
}
-func TestDiscoverNone(t *testing.T) {
- defer teardown()
-
- // Set up a bogus ProcMounts file with no Keep vols.
- f, err := ioutil.TempFile("", "keeptest")
- if err != nil {
- t.Fatal(err)
+func (v *stubVolume) Index(ctx context.Context, prefix string, writeTo io.Writer) error {
+ v.stubLog.Printf("%s index %s", v.params.UUID, prefix)
+ if v.index != nil {
+ if err := v.index(ctx, prefix, writeTo); err != nil {
+ return err
+ }
}
- defer os.Remove(f.Name())
- fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
- fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
- fmt.Fprintln(f, "proc /proc proc opts 0 0")
- fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
- fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
- f.Close()
- ProcMounts = f.Name()
-
- trashLifetime = 24 * 60 * 60
- resultVols := volumeSet{}
- added := (&unixVolumeAdder{&resultVols}).Discover()
- if added != 0 || len(resultVols) != 0 {
- t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
- }
-}
-
-// TestIndex
-// Test an /index request.
-func TestIndex(t *testing.T) {
- defer teardown()
-
- // Set up Keep volumes and populate them.
- // Include multiple blocks on different volumes, and
- // some metadata files.
- KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Close()
-
- vols := KeepVM.AllReadable()
- vols[0].Put(TestHash, TestBlock)
- vols[1].Put(TestHash2, TestBlock2)
- vols[0].Put(TestHash3, TestBlock3)
- vols[0].Put(TestHash+".meta", []byte("metadata"))
- vols[1].Put(TestHash2+".meta", []byte("metadata"))
-
- buf := new(bytes.Buffer)
- vols[0].IndexTo("", buf)
- vols[1].IndexTo("", buf)
- indexRows := strings.Split(string(buf.Bytes()), "\n")
- sort.Strings(indexRows)
- sortedIndex := strings.Join(indexRows, "\n")
- expected := `^\n` + TestHash + `\+\d+ \d+\n` +
- TestHash3 + `\+\d+ \d+\n` +
- TestHash2 + `\+\d+ \d+$`
-
- match, err := regexp.MatchString(expected, sortedIndex)
- if err == nil {
- if !match {
- t.Errorf("IndexLocators returned:\n%s", string(buf.Bytes()))
+ buf := &bytes.Buffer{}
+ v.mtx.Lock()
+ for hash, ent := range v.data {
+ if ent.trash.IsZero() && strings.HasPrefix(hash, prefix) {
+ fmt.Fprintf(buf, "%s+%d %d\n", hash, len(ent.data), ent.mtime.UnixNano())
}
- } else {
- t.Errorf("regexp.MatchString: %s", err)
}
+ v.mtx.Unlock()
+ _, err := io.Copy(writeTo, buf)
+ return err
}
-// ========================================
-// Helper functions for unit tests.
-// ========================================
-
-// MakeTestVolumeManager returns a RRVolumeManager with the specified
-// number of MockVolumes.
-func MakeTestVolumeManager(numVolumes int) VolumeManager {
- vols := make([]Volume, numVolumes)
- for i := range vols {
- vols[i] = CreateMockVolume()
+func (v *stubVolume) Mtime(hash string) (time.Time, error) {
+ v.log("mtime", hash)
+ if v.mtime != nil {
+ if t, err := v.mtime(hash); err != nil {
+ return t, err
+ }
}
- return MakeRRVolumeManager(vols)
+ v.mtx.Lock()
+ defer v.mtx.Unlock()
+ ent, ok := v.data[hash]
+ if !ok || !ent.trash.IsZero() {
+ return time.Time{}, os.ErrNotExist
+ }
+ return ent.mtime, nil
}
-// teardown cleans up after each test.
-func teardown() {
- dataManagerToken = ""
- enforcePermissions = false
- PermissionSecret = nil
- KeepVM = nil
+func (v *stubVolume) EmptyTrash() {
+ v.stubLog.Printf("%s emptytrash", v.params.UUID)
+ v.mtx.Lock()
+ defer v.mtx.Unlock()
+ for hash, ent := range v.data {
+ if !ent.trash.IsZero() && time.Now().After(ent.trash) {
+ delete(v.data, hash)
+ }
+ }
}