X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/dfe0ec7bfec3fd72cd40d3962e5c8af08d2413d2..68cde7e2a496c4e57cb1576bb8bf29415d3c0b67:/services/keepstore/volume_unix_test.go diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go index 887247d3c3..7f1cd21964 100644 --- a/services/keepstore/volume_unix_test.go +++ b/services/keepstore/volume_unix_test.go @@ -1,7 +1,13 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( "bytes" + "context" + "encoding/json" "errors" "fmt" "io" @@ -12,6 +18,9 @@ import ( "syscall" "testing" "time" + + "github.com/ghodss/yaml" + check "gopkg.in/check.v1" ) type TestableUnixVolume struct { @@ -45,7 +54,7 @@ func (v *TestableUnixVolume) PutRaw(locator string, data []byte) { v.ReadOnly = orig }(v.ReadOnly) v.ReadOnly = false - err := v.Put(locator, data) + err := v.Put(context.Background(), locator, data) if err != nil { v.t.Fatal(err) } @@ -117,10 +126,10 @@ func TestReplicationDefault1(t *testing.T) { func TestGetNotFound(t *testing.T) { v := NewTestableUnixVolume(t, false, false) defer v.Teardown() - v.Put(TestHash, TestBlock) + v.Put(context.Background(), TestHash, TestBlock) buf := make([]byte, BlockSize) - n, err := v.Get(TestHash2, buf) + n, err := v.Get(context.Background(), TestHash2, buf) switch { case os.IsNotExist(err): break @@ -135,7 +144,7 @@ func TestPut(t *testing.T) { v := NewTestableUnixVolume(t, false, false) defer v.Teardown() - err := v.Put(TestHash, TestBlock) + err := v.Put(context.Background(), TestHash, TestBlock) if err != nil { t.Error(err) } @@ -153,7 +162,7 @@ func TestPutBadVolume(t *testing.T) { defer v.Teardown() os.Chmod(v.Root, 000) - err := v.Put(TestHash, TestBlock) + err := v.Put(context.Background(), TestHash, TestBlock) if err == nil { t.Error("Write should have failed") } @@ -166,12 +175,12 @@ func TestUnixVolumeReadonly(t *testing.T) { v.PutRaw(TestHash, TestBlock) buf := make([]byte, BlockSize) - _, err := v.Get(TestHash, buf) + _, err := v.Get(context.Background(), TestHash, buf) if err != nil { t.Errorf("got err %v, expected nil", err) } - err = v.Put(TestHash, TestBlock) + err = v.Put(context.Background(), TestHash, TestBlock) if err != MethodDisabledError { t.Errorf("got err %v, expected MethodDisabledError", err) } @@ -231,9 +240,9 @@ func TestUnixVolumeGetFuncWorkerError(t *testing.T) { v := NewTestableUnixVolume(t, false, false) defer v.Teardown() - v.Put(TestHash, TestBlock) + v.Put(context.Background(), TestHash, TestBlock) mockErr := errors.New("Mock error") - err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error { + err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error { return mockErr }) if err != mockErr { @@ -246,7 +255,7 @@ func TestUnixVolumeGetFuncFileError(t *testing.T) { defer v.Teardown() funcCalled := false - err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error { + err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error { funcCalled = true return nil }) @@ -262,13 +271,13 @@ func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) { v := NewTestableUnixVolume(t, false, false) defer v.Teardown() - v.Put(TestHash, TestBlock) + v.Put(context.Background(), TestHash, TestBlock) mtx := NewMockMutex() v.locker = mtx funcCalled := make(chan struct{}) - go v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error { + go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error { funcCalled <- struct{}{} return nil }) @@ -297,39 +306,137 @@ func TestUnixVolumeCompare(t *testing.T) { v := NewTestableUnixVolume(t, false, false) defer v.Teardown() - v.Put(TestHash, TestBlock) - err := v.Compare(TestHash, TestBlock) + v.Put(context.Background(), TestHash, TestBlock) + err := v.Compare(context.Background(), TestHash, TestBlock) if err != nil { t.Errorf("Got err %q, expected nil", err) } - err = v.Compare(TestHash, []byte("baddata")) + err = v.Compare(context.Background(), TestHash, []byte("baddata")) if err != CollisionError { t.Errorf("Got err %q, expected %q", err, CollisionError) } - v.Put(TestHash, []byte("baddata")) - err = v.Compare(TestHash, TestBlock) + v.Put(context.Background(), TestHash, []byte("baddata")) + err = v.Compare(context.Background(), TestHash, TestBlock) if err != DiskHashError { t.Errorf("Got err %q, expected %q", err, DiskHashError) } p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash) os.Chmod(p, 000) - err = v.Compare(TestHash, TestBlock) + err = v.Compare(context.Background(), TestHash, TestBlock) if err == nil || strings.Index(err.Error(), "permission denied") < 0 { t.Errorf("Got err %q, expected %q", err, "permission denied") } } -// TODO(twp): show that the underlying Read/Write operations executed -// serially and not concurrently. The easiest way to do this is -// probably to activate verbose or debug logging, capture log output -// and examine it to confirm that Reads and Writes did not overlap. -// -// TODO(twp): a proper test of I/O serialization requires that a -// second request start while the first one is still underway. -// Guaranteeing that the test behaves this way requires some tricky -// synchronization and mocking. For now we'll just launch a bunch of -// requests simultaenously in goroutines and demonstrate that they -// return accurate results. +func TestUnixVolumeContextCancelPut(t *testing.T) { + v := NewTestableUnixVolume(t, true, false) + defer v.Teardown() + v.locker.Lock() + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(50 * time.Millisecond) + cancel() + time.Sleep(50 * time.Millisecond) + v.locker.Unlock() + }() + err := v.Put(ctx, TestHash, TestBlock) + if err != context.Canceled { + t.Errorf("Put() returned %s -- expected short read / canceled", err) + } +} + +func TestUnixVolumeContextCancelGet(t *testing.T) { + v := NewTestableUnixVolume(t, false, false) + defer v.Teardown() + bpath := v.blockPath(TestHash) + v.PutRaw(TestHash, TestBlock) + os.Remove(bpath) + err := syscall.Mkfifo(bpath, 0600) + if err != nil { + t.Fatalf("Mkfifo %s: %s", bpath, err) + } + defer os.Remove(bpath) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(50 * time.Millisecond) + cancel() + }() + buf := make([]byte, len(TestBlock)) + n, err := v.Get(ctx, TestHash, buf) + if n == len(TestBlock) || err != context.Canceled { + t.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err) + } +} + +var _ = check.Suite(&UnixVolumeSuite{}) + +type UnixVolumeSuite struct { + volume *TestableUnixVolume +} + +func (s *UnixVolumeSuite) TearDownTest(c *check.C) { + if s.volume != nil { + s.volume.Teardown() + } +} + +func (s *UnixVolumeSuite) TestStats(c *check.C) { + s.volume = NewTestableUnixVolume(c, false, false) + stats := func() string { + buf, err := json.Marshal(s.volume.InternalStats()) + c.Check(err, check.IsNil) + return string(buf) + } + + c.Check(stats(), check.Matches, `.*"StatOps":0,.*`) + c.Check(stats(), check.Matches, `.*"Errors":0,.*`) + + loc := "acbd18db4cc2f85cedef654fccc4a4d8" + _, err := s.volume.Get(context.Background(), loc, make([]byte, 3)) + c.Check(err, check.NotNil) + c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`) + c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`) + c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`) + c.Check(stats(), check.Matches, `.*"InBytes":0,.*`) + c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`) + c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`) + + err = s.volume.Put(context.Background(), loc, []byte("foo")) + c.Check(err, check.IsNil) + c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`) + c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`) + c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`) + c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`) + + err = s.volume.Touch(loc) + c.Check(err, check.IsNil) + c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`) + c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`) + c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`) + + _, err = s.volume.Get(context.Background(), loc, make([]byte, 3)) + c.Check(err, check.IsNil) + err = s.volume.Compare(context.Background(), loc, []byte("foo")) + c.Check(err, check.IsNil) + c.Check(stats(), check.Matches, `.*"InBytes":6,.*`) + c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`) + + err = s.volume.Trash(loc) + c.Check(err, check.IsNil) + c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`) +} + +func (s *UnixVolumeSuite) TestConfig(c *check.C) { + var cfg Config + err := yaml.Unmarshal([]byte(` +Volumes: + - Type: Directory + StorageClasses: ["class_a", "class_b"] +`), &cfg) + + c.Check(err, check.IsNil) + c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"}) +}