X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b7f7878f8f0648ba5a53e24abb109ce9ad59bfc3..26411493537eeb54ddafa2e9e29a5723edcd0316:/services/keepstore/keepstore_test.go diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go index 8682e23f56..79d51829fe 100644 --- a/services/keepstore/keepstore_test.go +++ b/services/keepstore/keepstore_test.go @@ -1,437 +1,874 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package keepstore import ( "bytes" + "context" + "crypto/md5" + "errors" "fmt" - "io/ioutil" + "io" + "net/http" "os" - "path" - "regexp" "sort" "strings" + "sync" "testing" + "time" + + "git.arvados.org/arvados.git/lib/config" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadostest" + "git.arvados.org/arvados.git/sdk/go/auth" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "github.com/prometheus/client_golang/prometheus" + . "gopkg.in/check.v1" ) -var TestBlock = []byte("The quick brown fox jumps over the lazy dog.") -var TestHash = "e4d909c290d0fb1ca068ffaddf22cbd0" -var TestHashPutResp = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n" - -var TestBlock2 = []byte("Pack my box with five dozen liquor jugs.") -var TestHash2 = "f15ac516f788aec4f30932ffb6395c39" - -var TestBlock3 = []byte("Now is the time for all good men to come to the aid of their country.") -var TestHash3 = "eed29bbffbc2dbe5e5ee0bb71888e61f" - -// BadBlock is used to test collisions and corruption. -// It must not match any test hashes. -var BadBlock = []byte("The magic words are squeamish ossifrage.") +func TestGocheck(t *testing.T) { + TestingT(t) +} -// TODO(twp): Tests still to be written -// -// * TestPutBlockFull -// - test that PutBlock returns 503 Full if the filesystem is full. -// (must mock FreeDiskSpace or Statfs? use a tmpfs?) -// -// * TestPutBlockWriteErr -// - test the behavior when Write returns an error. -// - Possible solutions: use a small tmpfs and a high -// MIN_FREE_KILOBYTES to trick PutBlock into attempting -// to write a block larger than the amount of space left -// - use an interface to mock ioutil.TempFile with a File -// object that always returns an error on write -// -// ======================================== -// GetBlock tests. -// ======================================== +const ( + fooHash = "acbd18db4cc2f85cedef654fccc4a4d8" + barHash = "37b51d194a7513e45b56f6524f2d51f2" +) -// TestGetBlock -// Test that simple block reads succeed. -// -func TestGetBlock(t *testing.T) { - defer teardown() +var testServiceURL = func() arvados.URL { + return arvados.URL{Host: "localhost:12345", Scheme: "http"} +}() - // Prepare two test Keep volumes. Our block is stored on the second volume. - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() +func authContext(token string) context.Context { + return auth.NewContext(context.TODO(), &auth.Credentials{Tokens: []string{token}}) +} - vols := KeepVM.AllReadable() - if err := vols[1].Put(TestHash, TestBlock); err != nil { - t.Error(err) +func testCluster(t TB) *arvados.Cluster { + cfg, err := config.NewLoader(bytes.NewBufferString("Clusters: {zzzzz: {}}"), ctxlog.TestLogger(t)).Load() + if err != nil { + t.Fatal(err) } - - // Check that GetBlock returns success. - result, err := GetBlock(TestHash) + cluster, err := cfg.GetCluster("") if err != nil { - t.Errorf("GetBlock error: %s", err) + t.Fatal(err) } - if fmt.Sprint(result) != fmt.Sprint(TestBlock) { - t.Errorf("expected %s, got %s", TestBlock, result) + cluster.SystemRootToken = arvadostest.SystemRootToken + cluster.ManagementToken = arvadostest.ManagementToken + return cluster +} + +func testKeepstore(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*keepstore, context.CancelFunc) { + if reg == nil { + reg = prometheus.NewRegistry() + } + ctx, cancel := context.WithCancel(context.Background()) + ctx = ctxlog.Context(ctx, ctxlog.TestLogger(t)) + ks, err := newKeepstore(ctx, cluster, cluster.SystemRootToken, reg, testServiceURL) + if err != nil { + t.Fatal(err) } + return ks, cancel } -// TestGetBlockMissing -// GetBlock must return an error when the block is not found. -// -func TestGetBlockMissing(t *testing.T) { - defer teardown() +var _ = Suite(&keepstoreSuite{}) - // Create two empty test Keep volumes. - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() +type keepstoreSuite struct { + cluster *arvados.Cluster +} - // Check that GetBlock returns failure. - result, err := GetBlock(TestHash) - if err != NotFoundError { - t.Errorf("Expected NotFoundError, got %v", result) +func (s *keepstoreSuite) SetUpTest(c *C) { + s.cluster = testCluster(c) + s.cluster.Volumes = map[string]arvados.Volume{ + "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, + "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"}, } } -// TestGetBlockCorrupt -// GetBlock must return an error when a corrupted block is requested -// (the contents of the file do not checksum to its hash). -// -func TestGetBlockCorrupt(t *testing.T) { - defer teardown() +func (s *keepstoreSuite) TestBlockRead_ChecksumMismatch(c *C) { + ks, cancel := testKeepstore(c, s.cluster, nil) + defer cancel() + + ctx := authContext(arvadostest.ActiveTokenV2) + + fooHash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + err := ks.mountsW[0].BlockWrite(ctx, fooHash, []byte("bar")) + c.Assert(err, IsNil) + + _, err = ks.BlockWrite(ctx, arvados.BlockWriteOptions{ + Hash: fooHash, + Data: []byte("foo"), + }) + c.Check(err, ErrorMatches, "hash collision") + + buf := bytes.NewBuffer(nil) + _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{ + Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"), + WriteTo: buf, + }) + c.Check(err, ErrorMatches, "checksum mismatch in stored data") + c.Check(buf.String(), Not(Equals), "foo") + c.Check(buf.Len() < 3, Equals, true) + + err = ks.mountsW[1].BlockWrite(ctx, fooHash, []byte("foo")) + c.Assert(err, IsNil) + + buf = bytes.NewBuffer(nil) + _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{ + Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"), + WriteTo: buf, + }) + c.Check(err, ErrorMatches, "checksum mismatch in stored data") + c.Check(buf.Len() < 3, Equals, true) +} - // Create two test Keep volumes and store a corrupt block in one. - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() +func (s *keepstoreSuite) TestBlockReadWrite_SigningDisabled(c *C) { + origKey := s.cluster.Collections.BlobSigningKey + s.cluster.Collections.BlobSigning = false + s.cluster.Collections.BlobSigningKey = "" + ks, cancel := testKeepstore(c, s.cluster, nil) + defer cancel() + + resp, err := ks.BlockWrite(authContext("abcde"), arvados.BlockWriteOptions{ + Hash: fooHash, + Data: []byte("foo"), + }) + c.Assert(err, IsNil) + c.Check(resp.Locator, Equals, fooHash+"+3") + locUnsigned := resp.Locator + ttl := time.Hour + locSigned := arvados.SignLocator(locUnsigned, arvadostest.ActiveTokenV2, time.Now().Add(ttl), ttl, []byte(origKey)) + c.Assert(locSigned, Not(Equals), locUnsigned) + + for _, locator := range []string{locUnsigned, locSigned} { + for _, token := range []string{"", "xyzzy", arvadostest.ActiveTokenV2} { + c.Logf("=== locator %q token %q", locator, token) + ctx := authContext(token) + buf := bytes.NewBuffer(nil) + _, err := ks.BlockRead(ctx, arvados.BlockReadOptions{ + Locator: locator, + WriteTo: buf, + }) + c.Check(err, IsNil) + c.Check(buf.String(), Equals, "foo") + } + } +} - vols := KeepVM.AllReadable() - vols[0].Put(TestHash, BadBlock) +func (s *keepstoreSuite) TestBlockRead_OrderedByStorageClassPriority(c *C) { + s.cluster.Volumes = map[string]arvados.Volume{ + "zzzzz-nyw5e-111111111111111": { + Driver: "stub", + Replication: 1, + StorageClasses: map[string]bool{"class1": true}}, + "zzzzz-nyw5e-222222222222222": { + Driver: "stub", + Replication: 1, + StorageClasses: map[string]bool{"class2": true, "class3": true}}, + } + + // "foobar" is just some data that happens to result in + // rendezvous order {111, 222} + data := []byte("foobar") + hash := fmt.Sprintf("%x", md5.Sum(data)) + + for _, trial := range []struct { + priority1 int // priority of class1, thus vol1 + priority2 int // priority of class2 + priority3 int // priority of class3 (vol2 priority will be max(priority2, priority3)) + expectLog string + }{ + {100, 50, 50, "111 read 385\n"}, // class1 has higher priority => try vol1 first, no need to try vol2 + {100, 100, 100, "111 read 385\n"}, // same priority, vol2 is first in rendezvous order => try vol1 first and succeed + {66, 99, 33, "222 read 385\n111 read 385\n"}, // class2 has higher priority => try vol2 first, then try vol1 + {66, 33, 99, "222 read 385\n111 read 385\n"}, // class3 has highest priority => vol2 has highest => try vol2 first, then try vol1 + } { + c.Logf("=== %+v", trial) + + s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{ + "class1": {Priority: trial.priority1}, + "class2": {Priority: trial.priority2}, + "class3": {Priority: trial.priority3}, + } + ks, cancel := testKeepstore(c, s.cluster, nil) + defer cancel() + + ctx := authContext(arvadostest.ActiveTokenV2) + resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{ + Hash: hash, + Data: data, + StorageClasses: []string{"class1"}, + }) + c.Assert(err, IsNil) + + // Combine logs into one. (We only want the logs from + // the BlockRead below, not from BlockWrite above.) + stubLog := &stubLog{} + for _, mnt := range ks.mounts { + mnt.volume.(*stubVolume).stubLog = stubLog + } - // Check that GetBlock returns failure. - result, err := GetBlock(TestHash) - if err != DiskHashError { - t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result) + n, err := ks.BlockRead(ctx, arvados.BlockReadOptions{ + Locator: resp.Locator, + WriteTo: io.Discard, + }) + c.Assert(n, Equals, len(data)) + c.Assert(err, IsNil) + c.Check(stubLog.String(), Equals, trial.expectLog) } } -// ======================================== -// PutBlock tests -// ======================================== +func (s *keepstoreSuite) TestBlockWrite_NoWritableVolumes(c *C) { + for uuid, v := range s.cluster.Volumes { + v.ReadOnly = true + s.cluster.Volumes[uuid] = v + } + ks, cancel := testKeepstore(c, s.cluster, nil) + defer cancel() + for _, mnt := range ks.mounts { + mnt.volume.(*stubVolume).blockWrite = func(context.Context, string, []byte) error { + c.Error("volume BlockWrite called") + return errors.New("fail") + } + } + ctx := authContext(arvadostest.ActiveTokenV2) -// TestPutBlockOK -// PutBlock can perform a simple block write and returns success. -// -func TestPutBlockOK(t *testing.T) { - defer teardown() + _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{ + Hash: fooHash, + Data: []byte("foo")}) + c.Check(err, NotNil) + c.Check(err.(interface{ HTTPStatus() int }).HTTPStatus(), Equals, http.StatusInsufficientStorage) +} - // Create two test Keep volumes. - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() +func (s *keepstoreSuite) TestBlockWrite_MultipleStorageClasses(c *C) { + s.cluster.Volumes = map[string]arvados.Volume{ + "zzzzz-nyw5e-111111111111111": { + Driver: "stub", + Replication: 1, + StorageClasses: map[string]bool{"class1": true}}, + "zzzzz-nyw5e-121212121212121": { + Driver: "stub", + Replication: 1, + StorageClasses: map[string]bool{"class1": true, "class2": true}}, + "zzzzz-nyw5e-222222222222222": { + Driver: "stub", + Replication: 1, + StorageClasses: map[string]bool{"class2": true}}, + } + + // testData is a block that happens to have rendezvous order 111, 121, 222 + testData := []byte("qux") + testHash := fmt.Sprintf("%x+%d", md5.Sum(testData), len(testData)) + + s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{ + "class1": {}, + "class2": {}, + "class3": {}, + } + + ctx := authContext(arvadostest.ActiveTokenV2) + for idx, trial := range []struct { + classes string // desired classes + expectLog string + }{ + {"class1", "" + + "111 read d85\n" + + "121 read d85\n" + + "111 write d85\n" + + "111 read d85\n" + + "111 touch d85\n"}, + {"class2", "" + + "121 read d85\n" + // write#1 + "222 read d85\n" + + "121 write d85\n" + + "121 read d85\n" + // write#2 + "121 touch d85\n"}, + {"class1,class2", "" + + "111 read d85\n" + // write#1 + "121 read d85\n" + + "222 read d85\n" + + "121 write d85\n" + + "111 write d85\n" + + "111 read d85\n" + // write#2 + "111 touch d85\n" + + "121 read d85\n" + + "121 touch d85\n"}, + {"class1,class2,class404", "" + + "111 read d85\n" + // write#1 + "121 read d85\n" + + "222 read d85\n" + + "121 write d85\n" + + "111 write d85\n" + + "111 read d85\n" + // write#2 + "111 touch d85\n" + + "121 read d85\n" + + "121 touch d85\n"}, + } { + c.Logf("=== %d: %+v", idx, trial) + + ks, cancel := testKeepstore(c, s.cluster, nil) + defer cancel() + stubLog := &stubLog{} + for _, mnt := range ks.mounts { + mnt.volume.(*stubVolume).stubLog = stubLog + } - // Check that PutBlock stores the data as expected. - if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 { - t.Fatalf("PutBlock: n %d err %v", n, err) + // Check that we chose the right block data + rvz := ks.rendezvous(testHash, ks.mountsW) + c.Assert(rvz[0].UUID[24:], Equals, "111") + c.Assert(rvz[1].UUID[24:], Equals, "121") + c.Assert(rvz[2].UUID[24:], Equals, "222") + + for i := 0; i < 2; i++ { + _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{ + Hash: testHash, + Data: testData, + StorageClasses: strings.Split(trial.classes, ","), + }) + c.Check(err, IsNil) + } + c.Check(stubLog.String(), Equals, trial.expectLog) } +} - vols := KeepVM.AllReadable() - result, err := vols[1].Get(TestHash) - if err != nil { - t.Fatalf("Volume #0 Get returned error: %v", err) +func (s *keepstoreSuite) TestBlockTrash(c *C) { + s.cluster.Volumes = map[string]arvados.Volume{ + "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, + "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"}, + "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true}, + "zzzzz-nyw5e-333333333333333": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true}, + } + ks, cancel := testKeepstore(c, s.cluster, nil) + defer cancel() + + var vol []*stubVolume + for _, mount := range ks.mountsR { + vol = append(vol, mount.volume.(*stubVolume)) + } + sort.Slice(vol, func(i, j int) bool { + return vol[i].params.UUID < vol[j].params.UUID + }) + + ctx := context.Background() + loc := fooHash + "+3" + tOld := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Second) + + clear := func() { + for _, vol := range vol { + err := vol.BlockTrash(fooHash) + if !os.IsNotExist(err) { + c.Assert(err, IsNil) + } + } + } + writeit := func(volidx int) { + err := vol[volidx].BlockWrite(ctx, fooHash, []byte("foo")) + c.Assert(err, IsNil) + err = vol[volidx].blockTouchWithTime(fooHash, tOld) + c.Assert(err, IsNil) } - if string(result) != string(TestBlock) { - t.Fatalf("PutBlock stored '%s', Get retrieved '%s'", - string(TestBlock), string(result)) + trashit := func() error { + return ks.BlockTrash(ctx, loc) } + checkexists := func(volidx int) bool { + err := vol[volidx].BlockRead(ctx, fooHash, brdiscard) + if !os.IsNotExist(err) { + c.Check(err, IsNil) + } + return err == nil + } + + clear() + c.Check(trashit(), Equals, os.ErrNotExist) + + // one old replica => trash it + clear() + writeit(0) + c.Check(trashit(), IsNil) + c.Check(checkexists(0), Equals, false) + + // one old replica + one new replica => keep new, trash old + clear() + writeit(0) + writeit(1) + c.Check(vol[1].blockTouchWithTime(fooHash, time.Now()), IsNil) + c.Check(trashit(), IsNil) + c.Check(checkexists(0), Equals, false) + c.Check(checkexists(1), Equals, true) + + // two old replicas => trash both + clear() + writeit(0) + writeit(1) + c.Check(trashit(), IsNil) + c.Check(checkexists(0), Equals, false) + c.Check(checkexists(1), Equals, false) + + // four old replicas => trash all except readonly volume with + // AllowTrashWhenReadOnly==false + clear() + writeit(0) + writeit(1) + writeit(2) + writeit(3) + c.Check(trashit(), IsNil) + c.Check(checkexists(0), Equals, false) + c.Check(checkexists(1), Equals, false) + c.Check(checkexists(2), Equals, true) + c.Check(checkexists(3), Equals, false) + + // two old replicas but one returns an error => return the + // only non-404 backend error + clear() + vol[0].blockTrash = func(hash string) error { + return errors.New("fake error") + } + writeit(0) + writeit(3) + c.Check(trashit(), ErrorMatches, "fake error") + c.Check(checkexists(0), Equals, true) + c.Check(checkexists(1), Equals, false) + c.Check(checkexists(2), Equals, false) + c.Check(checkexists(3), Equals, false) } -// TestPutBlockOneVol -// PutBlock still returns success even when only one of the known -// volumes is online. -// -func TestPutBlockOneVol(t *testing.T) { - defer teardown() - - // Create two test Keep volumes, but cripple one of them. - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() - - vols := KeepVM.AllWritable() - vols[0].(*MockVolume).Bad = true - - // Check that PutBlock stores the data as expected. - if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 { - t.Fatalf("PutBlock: n %d err %v", n, err) +func (s *keepstoreSuite) TestBlockWrite_OnlyOneBuffer(c *C) { + s.cluster.API.MaxKeepBlobBuffers = 1 + ks, cancel := testKeepstore(c, s.cluster, nil) + defer cancel() + ok := make(chan struct{}) + go func() { + defer close(ok) + ctx := authContext(arvadostest.ActiveTokenV2) + _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{ + Hash: fooHash, + Data: []byte("foo")}) + c.Check(err, IsNil) + }() + select { + case <-ok: + case <-time.After(time.Second): + c.Fatal("PUT deadlocks with MaxKeepBlobBuffers==1") } +} - result, err := GetBlock(TestHash) - if err != nil { - t.Fatalf("GetBlock: %v", err) - } - if string(result) != string(TestBlock) { - t.Error("PutBlock/GetBlock mismatch") - t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'", - string(TestBlock), string(result)) +func (s *keepstoreSuite) TestBufferPoolLeak(c *C) { + s.cluster.API.MaxKeepBlobBuffers = 4 + ks, cancel := testKeepstore(c, s.cluster, nil) + defer cancel() + + ctx := authContext(arvadostest.ActiveTokenV2) + var wg sync.WaitGroup + for range make([]int, 20) { + wg.Add(1) + go func() { + defer wg.Done() + resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{ + Hash: fooHash, + Data: []byte("foo")}) + c.Check(err, IsNil) + _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{ + Locator: resp.Locator, + WriteTo: io.Discard}) + c.Check(err, IsNil) + }() + } + ok := make(chan struct{}) + go func() { + wg.Wait() + close(ok) + }() + select { + case <-ok: + case <-time.After(time.Second): + c.Fatal("read/write sequence deadlocks, likely buffer pool leak") } } -// TestPutBlockMD5Fail -// Check that PutBlock returns an error if passed a block and hash that -// do not match. -// -func TestPutBlockMD5Fail(t *testing.T) { - defer teardown() +func (s *keepstoreSuite) TestPutStorageClasses(c *C) { + s.cluster.Volumes = map[string]arvados.Volume{ + "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, // "default" is implicit + "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"special": true, "extra": true}}, + "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true}, + } + ks, cancel := testKeepstore(c, s.cluster, nil) + defer cancel() + ctx := authContext(arvadostest.ActiveTokenV2) + + for _, trial := range []struct { + ask []string + expectReplicas int + expectClasses map[string]int + }{ + {nil, + 1, + map[string]int{"default": 1}}, + {[]string{}, + 1, + map[string]int{"default": 1}}, + {[]string{"default"}, + 1, + map[string]int{"default": 1}}, + {[]string{"default", "default"}, + 1, + map[string]int{"default": 1}}, + {[]string{"special"}, + 1, + map[string]int{"extra": 1, "special": 1}}, + {[]string{"special", "readonly"}, + 1, + map[string]int{"extra": 1, "special": 1}}, + {[]string{"special", "nonexistent"}, + 1, + map[string]int{"extra": 1, "special": 1}}, + {[]string{"extra", "special"}, + 1, + map[string]int{"extra": 1, "special": 1}}, + {[]string{"default", "special"}, + 2, + map[string]int{"default": 1, "extra": 1, "special": 1}}, + } { + c.Logf("success case %#v", trial) + resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{ + Hash: fooHash, + Data: []byte("foo"), + StorageClasses: trial.ask, + }) + if !c.Check(err, IsNil) { + continue + } + c.Check(resp.Replicas, Equals, trial.expectReplicas) + if len(trial.expectClasses) == 0 { + // any non-empty value is correct + c.Check(resp.StorageClasses, Not(HasLen), 0) + } else { + c.Check(resp.StorageClasses, DeepEquals, trial.expectClasses) + } + } - // Create two test Keep volumes. - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() + for _, ask := range [][]string{ + {"doesnotexist"}, + {"doesnotexist", "readonly"}, + {"readonly"}, + } { + c.Logf("failure case %s", ask) + _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{ + Hash: fooHash, + Data: []byte("foo"), + StorageClasses: ask, + }) + c.Check(err, NotNil) + } +} - // Check that PutBlock returns the expected error when the hash does - // not match the block. - if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError { - t.Error("Expected RequestHashError, got %v", err) +func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) { + for uuid, v := range s.cluster.Volumes { + v.ReadOnly = true + s.cluster.Volumes[uuid] = v } + ks, cancel := testKeepstore(c, s.cluster, nil) + defer cancel() - // Confirm that GetBlock fails to return anything. - if result, err := GetBlock(TestHash); err != NotFoundError { - t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)", - string(result), err) + for _, mnt := range ks.mounts { + err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo")) + c.Assert(err, IsNil) + err = mnt.BlockRead(context.Background(), fooHash, brdiscard) + c.Assert(err, IsNil) } -} -// TestPutBlockCorrupt -// PutBlock should overwrite corrupt blocks on disk when given -// a PUT request with a good block. -// -func TestPutBlockCorrupt(t *testing.T) { - defer teardown() + err := ks.BlockUntrash(context.Background(), fooHash) + c.Check(os.IsNotExist(err), Equals, true) - // Create two test Keep volumes. - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() + for _, mnt := range ks.mounts { + err := mnt.BlockRead(context.Background(), fooHash, brdiscard) + c.Assert(err, IsNil) + } +} - // Store a corrupted block under TestHash. - vols := KeepVM.AllWritable() - vols[0].Put(TestHash, BadBlock) - if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 { - t.Errorf("PutBlock: n %d err %v", n, err) +func (s *keepstoreSuite) TestBlockWrite_SkipReadOnly(c *C) { + s.cluster.Volumes = map[string]arvados.Volume{ + "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, + "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", ReadOnly: true}, + "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true}, } + ks, cancel := testKeepstore(c, s.cluster, nil) + defer cancel() + ctx := authContext(arvadostest.ActiveTokenV2) - // The block on disk should now match TestBlock. - if block, err := GetBlock(TestHash); err != nil { - t.Errorf("GetBlock: %v", err) - } else if bytes.Compare(block, TestBlock) != 0 { - t.Errorf("GetBlock returned: '%s'", string(block)) + for i := range make([]byte, 32) { + data := []byte(fmt.Sprintf("block %d", i)) + _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{Data: data}) + c.Assert(err, IsNil) } + c.Check(ks.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume).stubLog.String(), Matches, "(?ms).*write.*") + c.Check(ks.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume).stubLog.String(), HasLen, 0) + c.Check(ks.mounts["zzzzz-nyw5e-222222222222222"].volume.(*stubVolume).stubLog.String(), HasLen, 0) } -// TestPutBlockCollision -// PutBlock returns a 400 Collision error when attempting to -// store a block that collides with another block on disk. -// -func TestPutBlockCollision(t *testing.T) { - defer teardown() +func (s *keepstoreSuite) TestParseLocator(c *C) { + for _, trial := range []struct { + locator string + ok bool + expect locatorInfo + }{ + {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + ok: true}, + {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234", + ok: true, expect: locatorInfo{size: 1234}}, + {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Abcdef@abcdef", + ok: true, expect: locatorInfo{size: 1234, signed: true}}, + {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Rzzzzz-abcdef", + ok: true, expect: locatorInfo{size: 1234, remote: true}}, + {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+12345+Zexample+Rzzzzz-abcdef", + ok: true, expect: locatorInfo{size: 12345, remote: true}}, + // invalid: hash length != 32 + {locator: "", + ok: false}, + {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + ok: false}, + {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234", + ok: false}, + {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb", + ok: false}, + {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb+1234", + ok: false}, + // invalid: first hint is not size + {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+Abcdef+1234", + ok: false}, + } { + c.Logf("=== %s", trial.locator) + li, err := getLocatorInfo(trial.locator) + if !trial.ok { + c.Check(err, NotNil) + continue + } + c.Check(err, IsNil) + c.Check(li.hash, Equals, trial.locator[:32]) + c.Check(li.size, Equals, trial.expect.size) + c.Check(li.signed, Equals, trial.expect.signed) + c.Check(li.remote, Equals, trial.expect.remote) + } +} - // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41. - var b1 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9epO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\\\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef") - var b2 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9etO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\xdc\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef") - var locator = "cee9a457e790cf20d4bdaa6d69f01e41" +func init() { + driver["stub"] = func(params newVolumeParams) (volume, error) { + v := &stubVolume{ + params: params, + data: make(map[string]stubData), + stubLog: &stubLog{}, + } + return v, nil + } +} - // Prepare two test Keep volumes. - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() +type stubLog struct { + sync.Mutex + bytes.Buffer +} - // Store one block, then attempt to store the other. Confirm that - // PutBlock reported a CollisionError. - if _, err := PutBlock(b1, locator); err != nil { - t.Error(err) - } - if _, err := PutBlock(b2, locator); err == nil { - t.Error("PutBlock did not report a collision") - } else if err != CollisionError { - t.Errorf("PutBlock returned %v", err) +func (sl *stubLog) Printf(format string, args ...interface{}) { + if sl == nil { + return } + sl.Lock() + defer sl.Unlock() + fmt.Fprintf(sl, format+"\n", args...) } -// TestPutBlockTouchFails -// When PutBlock is asked to PUT an existing block, but cannot -// modify the timestamp, it should write a second block. -// -func TestPutBlockTouchFails(t *testing.T) { - defer teardown() - - // Prepare two test Keep volumes. - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() - vols := KeepVM.AllWritable() - - // Store a block and then make the underlying volume bad, - // so a subsequent attempt to update the file timestamp - // will fail. - vols[0].Put(TestHash, BadBlock) - oldMtime, err := vols[0].Mtime(TestHash) - if err != nil { - t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err) - } +type stubData struct { + mtime time.Time + data []byte + trash time.Time +} - // vols[0].Touch will fail on the next call, so the volume - // manager will store a copy on vols[1] instead. - vols[0].(*MockVolume).Touchable = false - if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 { - t.Fatalf("PutBlock: n %d err %v", n, err) - } - vols[0].(*MockVolume).Touchable = true +type stubVolume struct { + params newVolumeParams + data map[string]stubData + stubLog *stubLog + mtx sync.Mutex + + // The following funcs enable tests to insert delays and + // failures. Each volume operation begins by calling the + // corresponding func (if non-nil). If the func returns an + // error, that error is returned to caller. Otherwise, the + // stub continues normally. + blockRead func(ctx context.Context, hash string, writeTo io.WriterAt) error + blockWrite func(ctx context.Context, hash string, data []byte) error + deviceID func() string + blockTouch func(hash string) error + blockTrash func(hash string) error + blockUntrash func(hash string) error + index func(ctx context.Context, prefix string, writeTo io.Writer) error + mtime func(hash string) (time.Time, error) + emptyTrash func() +} - // Now the mtime on the block on vols[0] should be unchanged, and - // there should be a copy of the block on vols[1]. - newMtime, err := vols[0].Mtime(TestHash) - if err != nil { - t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err) +func (v *stubVolume) log(op, hash string) { + // Note this intentionally crashes if UUID or hash is short -- + // if keepstore ever does that, tests should fail. + v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3]) +} + +func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error { + v.log("read", hash) + if v.blockRead != nil { + err := v.blockRead(ctx, hash, writeTo) + if err != nil { + return err + } } - if !newMtime.Equal(oldMtime) { - t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n", - oldMtime, newMtime) + v.mtx.Lock() + ent, ok := v.data[hash] + v.mtx.Unlock() + if !ok || !ent.trash.IsZero() { + return os.ErrNotExist + } + wrote := 0 + for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 { + data := ent.data[wrote:] + if len(data) > writesize { + data = data[:writesize] + } + n, err := writeTo.WriteAt(data, int64(wrote)) + wrote += n + if err != nil { + return err + } } - result, err := vols[1].Get(TestHash) - if err != nil { - t.Fatalf("vols[1]: %v", err) + return nil +} + +func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error { + v.log("write", hash) + if v.blockWrite != nil { + if err := v.blockWrite(ctx, hash, data); err != nil { + return err + } } - if bytes.Compare(result, TestBlock) != 0 { - t.Errorf("new block does not match test block\nnew block = %v\n", result) + v.mtx.Lock() + defer v.mtx.Unlock() + v.data[hash] = stubData{ + mtime: time.Now(), + data: append([]byte(nil), data...), } + return nil } -func TestDiscoverTmpfs(t *testing.T) { - var tempVols [4]string - var err error +func (v *stubVolume) DeviceID() string { + return fmt.Sprintf("%p", v) +} - // Create some directories suitable for using as keep volumes. - for i := range tempVols { - if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil { - t.Fatal(err) - } - defer os.RemoveAll(tempVols[i]) - tempVols[i] = tempVols[i] + "/keep" - if err = os.Mkdir(tempVols[i], 0755); err != nil { - t.Fatal(err) +func (v *stubVolume) BlockTouch(hash string) error { + v.log("touch", hash) + if v.blockTouch != nil { + if err := v.blockTouch(hash); err != nil { + return err } } - - // Set up a bogus ProcMounts file. - f, err := ioutil.TempFile("", "keeptest") - if err != nil { - t.Fatal(err) - } - defer os.Remove(f.Name()) - for i, vol := range tempVols { - // Add readonly mount points at odd indexes. - var opts string - switch i % 2 { - case 0: - opts = "rw,nosuid,nodev,noexec" - case 1: - opts = "nosuid,nodev,noexec,ro" - } - fmt.Fprintf(f, "tmpfs %s tmpfs %s 0 0\n", path.Dir(vol), opts) + v.mtx.Lock() + defer v.mtx.Unlock() + ent, ok := v.data[hash] + if !ok || !ent.trash.IsZero() { + return os.ErrNotExist } - f.Close() - ProcMounts = f.Name() + ent.mtime = time.Now() + v.data[hash] = ent + return nil +} - resultVols := volumeSet{} - added := (&unixVolumeAdder{&resultVols}).Discover() +// Set mtime to the (presumably old) specified time. +func (v *stubVolume) blockTouchWithTime(hash string, t time.Time) error { + v.log("touchwithtime", hash) + v.mtx.Lock() + defer v.mtx.Unlock() + ent, ok := v.data[hash] + if !ok { + return os.ErrNotExist + } + ent.mtime = t + v.data[hash] = ent + return nil +} - if added != len(resultVols) { - t.Errorf("Discover returned %d, but added %d volumes", - added, len(resultVols)) +func (v *stubVolume) BlockTrash(hash string) error { + v.log("trash", hash) + if v.blockTrash != nil { + if err := v.blockTrash(hash); err != nil { + return err + } } - if added != len(tempVols) { - t.Errorf("Discover returned %d but we set up %d volumes", - added, len(tempVols)) + v.mtx.Lock() + defer v.mtx.Unlock() + ent, ok := v.data[hash] + if !ok || !ent.trash.IsZero() { + return os.ErrNotExist } - for i, tmpdir := range tempVols { - if tmpdir != resultVols[i].(*UnixVolume).root { - t.Errorf("Discover returned %s, expected %s\n", - resultVols[i].(*UnixVolume).root, tmpdir) - } - if expectReadonly := i%2 == 1; expectReadonly != resultVols[i].(*UnixVolume).readonly { - t.Errorf("Discover added %s with readonly=%v, should be %v", - tmpdir, !expectReadonly, expectReadonly) + ent.trash = time.Now().Add(v.params.Cluster.Collections.BlobTrashLifetime.Duration()) + v.data[hash] = ent + return nil +} + +func (v *stubVolume) BlockUntrash(hash string) error { + v.log("untrash", hash) + if v.blockUntrash != nil { + if err := v.blockUntrash(hash); err != nil { + return err } } + v.mtx.Lock() + defer v.mtx.Unlock() + ent, ok := v.data[hash] + if !ok || ent.trash.IsZero() { + return os.ErrNotExist + } + ent.trash = time.Time{} + v.data[hash] = ent + return nil } -func TestDiscoverNone(t *testing.T) { - defer teardown() - - // Set up a bogus ProcMounts file with no Keep vols. - f, err := ioutil.TempFile("", "keeptest") - if err != nil { - t.Fatal(err) +func (v *stubVolume) Index(ctx context.Context, prefix string, writeTo io.Writer) error { + v.stubLog.Printf("%s index %s", v.params.UUID, prefix) + if v.index != nil { + if err := v.index(ctx, prefix, writeTo); err != nil { + return err + } } - defer os.Remove(f.Name()) - fmt.Fprintln(f, "rootfs / rootfs opts 0 0") - fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0") - fmt.Fprintln(f, "proc /proc proc opts 0 0") - fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0") - fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0") - f.Close() - ProcMounts = f.Name() - - resultVols := volumeSet{} - added := (&unixVolumeAdder{&resultVols}).Discover() - if added != 0 || len(resultVols) != 0 { - t.Fatalf("got %d, %v; expected 0, []", added, resultVols) - } -} - -// TestIndex -// Test an /index request. -func TestIndex(t *testing.T) { - defer teardown() - - // Set up Keep volumes and populate them. - // Include multiple blocks on different volumes, and - // some metadata files. - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() - - vols := KeepVM.AllReadable() - vols[0].Put(TestHash, TestBlock) - vols[1].Put(TestHash2, TestBlock2) - vols[0].Put(TestHash3, TestBlock3) - vols[0].Put(TestHash+".meta", []byte("metadata")) - vols[1].Put(TestHash2+".meta", []byte("metadata")) - - buf := new(bytes.Buffer) - vols[0].IndexTo("", buf) - vols[1].IndexTo("", buf) - indexRows := strings.Split(string(buf.Bytes()), "\n") - sort.Strings(indexRows) - sortedIndex := strings.Join(indexRows, "\n") - expected := `^\n` + TestHash + `\+\d+ \d+\n` + - TestHash3 + `\+\d+ \d+\n` + - TestHash2 + `\+\d+ \d+$` - - match, err := regexp.MatchString(expected, sortedIndex) - if err == nil { - if !match { - t.Errorf("IndexLocators returned:\n%s", string(buf.Bytes())) + buf := &bytes.Buffer{} + v.mtx.Lock() + for hash, ent := range v.data { + if ent.trash.IsZero() && strings.HasPrefix(hash, prefix) { + fmt.Fprintf(buf, "%s+%d %d\n", hash, len(ent.data), ent.mtime.UnixNano()) } - } else { - t.Errorf("regexp.MatchString: %s", err) } + v.mtx.Unlock() + _, err := io.Copy(writeTo, buf) + return err } -// ======================================== -// Helper functions for unit tests. -// ======================================== - -// MakeTestVolumeManager returns a RRVolumeManager with the specified -// number of MockVolumes. -func MakeTestVolumeManager(numVolumes int) VolumeManager { - vols := make([]Volume, numVolumes) - for i := range vols { - vols[i] = CreateMockVolume() +func (v *stubVolume) Mtime(hash string) (time.Time, error) { + v.log("mtime", hash) + if v.mtime != nil { + if t, err := v.mtime(hash); err != nil { + return t, err + } } - return MakeRRVolumeManager(vols) + v.mtx.Lock() + defer v.mtx.Unlock() + ent, ok := v.data[hash] + if !ok || !ent.trash.IsZero() { + return time.Time{}, os.ErrNotExist + } + return ent.mtime, nil } -// teardown cleans up after each test. -func teardown() { - dataManagerToken = "" - enforcePermissions = false - PermissionSecret = nil - KeepVM = nil +func (v *stubVolume) EmptyTrash() { + v.stubLog.Printf("%s emptytrash", v.params.UUID) + v.mtx.Lock() + defer v.mtx.Unlock() + for hash, ent := range v.data { + if !ent.trash.IsZero() && time.Now().After(ent.trash) { + delete(v.data, hash) + } + } }